diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9c859238249c9..86f30a831165d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2922,6 +2922,15 @@ protected void internalSetNamespaceResourceGroup(String rgName) { internalSetPolicies("resource_group_name", rgName); } + protected CompletableFuture internalSetReplicateSubscriptionStateAsync(Boolean enabled) { + return validatePoliciesReadOnlyAccessAsync() + .thenCompose(__ -> validateNamespacePolicyOperationAsync(namespaceName, + PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.WRITE)) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.replicate_subscription_state = enabled; + return policies; + })); + } private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 09507fa73e608..ebd55d0c09006 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5016,6 +5016,29 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author } } + protected CompletableFuture internalSetEnableReplicatedSubscription(Boolean enabled, + boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicateSubscriptionState(enabled); + topicPolicies.setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + } + + protected CompletableFuture internalGetReplicateSubscriptionState(boolean applied, + boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getReplicateSubscriptionState) + .orElseGet(() -> { + if (applied) { + return getNamespacePolicies(namespaceName).replicate_subscription_state; + } + return null; + })); + } + protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) { log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index fbe0918b8a836..6a115e3372ff2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -42,6 +42,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.SubscriptionType; @@ -1946,5 +1947,42 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant, internalSetNamespaceResourceGroup(null); } + @GET + @Path("/{tenant}/{namespace}/replicateSubscriptionState") + @ApiOperation(value = "Get the enabled status of subscription replication on a namespace.", response = + Boolean.class) + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public Boolean getReplicateSubscriptionState(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), + PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.READ); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.replicate_subscription_state; + } + + @POST + @Path("/{tenant}/{namespace}/replicateSubscriptionState") + @ApiOperation(value = "Enable or disable subscription replication on a namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public void setReplicateSubscriptionState(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "Whether to enable subscription replication", + required = true) + Boolean enabled) { + validateNamespaceName(tenant, namespace); + internalSetReplicateSubscriptionStateAsync(enabled) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("set replicate subscription state failed", ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1c97821c4cb01..1e1d4ca352721 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3896,6 +3896,85 @@ public void getReplicatedSubscriptionStatus( internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative); } + @POST + @Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionState") + @ApiOperation(value = "Enable or disable subscription replication on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicateSubscriptionState( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable subscription replication", required = true) + Boolean enabled) { + validateTopicName(tenant, namespace, encodedTopic); + validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS) + .thenCompose(__ -> preValidation(authoritative)) + .thenCompose(__ -> internalSetEnableReplicatedSubscription(enabled, isGlobal)) + .thenRun(() -> { + log.info( + "[{}] Successfully set topic replicated subscription enabled: tenant={}, namespace={}, " + + "topic={}, isGlobal={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + isGlobal); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setReplicateSubscriptionState", ex, asyncResponse); + return null; + }); + } + + @GET + @Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionState") + @ApiOperation(value = "Get the enabled status of subscription replication on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have permission to administrate resources"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void getReplicateSubscriptionState( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS) + .thenCompose(__ -> preValidation(authoritative)) + .thenCompose(__ -> internalGetReplicateSubscriptionState(applied, isGlobal)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getReplicateSubscriptionState", ex, asyncResponse); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy") @ApiOperation(value = "Get schema compatibility strategy on a topic") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9b20a57506c98..c424ce7dc9a52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -201,6 +201,7 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters()); topicPolicies.getSchemaCompatibilityStrategy() .updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy())); + topicPolicies.getReplicateSubscriptionState().updateTopicValue(data.getReplicateSubscriptionState()); } topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies()); topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic()); @@ -278,6 +279,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); + if (!isSystemTopic()) { + topicPolicies.getReplicateSubscriptionState() + .updateNamespaceValue(namespacePolicies.replicate_subscription_state); + } } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b0926ee31d9f0..943b1aff07420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -219,6 +219,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long connectionLivenessCheckTimeoutMillis = 5000; + private final boolean ignoreConsumerReplicateSubscriptionState; + // Number of bytes pending to be published from a single specific IO thread. private static final FastThreadLocal pendingBytesPerThread = new FastThreadLocal() { @Override @@ -281,6 +283,9 @@ public ServerCnx(PulsarService pulsar, String listenerName) { this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2; this.connectionController = new ConnectionController.DefaultConnectionController(conf); this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; + this.ignoreConsumerReplicateSubscriptionState = + Boolean.parseBoolean( + conf.getProperties().getProperty("ignoreConsumerReplicateSubscriptionState", "false")); } @Override @@ -984,8 +989,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; - final Boolean isReplicated = - subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null; + final Boolean isReplicated = ignoreConsumerReplicateSubscriptionState ? null : + (subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null); final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f9a347aef9567..9e2e309cc2a0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -184,6 +184,10 @@ public boolean isReplicated() { public boolean setReplicated(boolean replicated) { replicatedControlled = replicated; + return setReplicated(replicated, true); + } + + public boolean setReplicated(boolean replicated, boolean isPersistent) { ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -193,7 +197,7 @@ public boolean setReplicated(boolean replicated) { config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); } - if (this.cursor != null) { + if (this.cursor != null && isPersistent) { if (replicated) { if (!config.isEnableReplicatedSubscriptions()) { log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 37d32782d0401..386c8639c737d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -277,7 +277,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null, cursor.getCursorProperties())); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now @@ -2932,8 +2932,14 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem } public synchronized void checkReplicatedSubscriptionControllerState() { + Boolean replicatedSubscriptionStatus = topicPolicies.getReplicateSubscriptionState().get(); AtomicBoolean shouldBeEnabled = new AtomicBoolean(false); subscriptions.forEach((name, subscription) -> { + // If the subscription does not have a replicated flag configured, please apply the topic policies to the + // subscription. + if (subscription.getReplicatedControlled() == null) { + subscription.setReplicated(replicatedSubscriptionStatus != null && replicatedSubscriptionStatus, false); + } if (subscription.isReplicated()) { shouldBeEnabled.set(true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionStateTest.java new file mode 100644 index 0000000000000..7c185c19f5826 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionStateTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.awaitility.Awaitility.await; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class AdminApiReplicateSubscriptionStateTest extends MockedPulsarServiceBaseTest { + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testReplicateSubscriptionStateOnNamespaceLevel() throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionState" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionState(nsName)); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + + admin.namespaces().setReplicateSubscriptionState(nsName, true); + assertTrue(admin.namespaces().getReplicateSubscriptionState(nsName)); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + assertTrue(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + }); + + admin.namespaces().setReplicateSubscriptionState(nsName, false); + assertFalse(admin.namespaces().getReplicateSubscriptionState(nsName)); + await().untilAsserted(() -> { + assertFalse(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + }); + + admin.namespaces().setReplicateSubscriptionState(nsName, null); + assertNull(admin.namespaces().getReplicateSubscriptionState(nsName)); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + }); + } + + @Test + public void testReplicateSubscriptionStateOnTopicLevel() throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionState" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionState(nsName)); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, true); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topicName, true), Boolean.TRUE); + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topicName, false), Boolean.TRUE); + }); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, false); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topicName, true), Boolean.FALSE); + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topicName, false), Boolean.FALSE); + }); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, null); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + }); + } + + @DataProvider + Object[] replicateSubscriptionStatePriorityLevelDataProvider() { + return new Object[]{ + true, + false, + null, + }; + } + + @Test(dataProvider = "replicateSubscriptionStatePriorityLevelDataProvider") + public void testReplicateSubscriptionStatePriorityLevel(Boolean enabledOnNamespace) + throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionState" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionState(nsName)); + admin.namespaces().setReplicateSubscriptionState(nsName, enabledOnNamespace); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, false); + await().untilAsserted(() -> { + assertFalse(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertFalse(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + }); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, true); + await().untilAsserted(() -> { + assertTrue(admin.topicPolicies().getReplicateSubscriptionState(topicName, true)); + assertTrue(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + }); + + admin.topicPolicies().setReplicateSubscriptionState(topicName, null); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionState(topicName, false)); + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topicName, true), enabledOnNamespace); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java new file mode 100644 index 0000000000000..b73aaec4bbfa3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.AssertJUnit.assertTrue; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class IgnoreConsumerReplicateSubscriptionStateTest extends MockedPulsarServiceBaseTest { + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + Properties properties = conf.getProperties(); + if (properties==null) { + properties = new Properties(); + } + properties.put("ignoreConsumerReplicateSubscriptionState", "true"); + } + + @DataProvider + Object[] replicateSubscriptionState(){ + return new Object[]{true,false}; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void ignoreConsumerReplicateSubscriptionState(boolean enabled) + throws PulsarClientException, ExecutionException, InterruptedException { + String topicName = "topic-" + System.nanoTime(); + String subName = "sub-" + System.nanoTime(); + @Cleanup Consumer ignored = + pulsarClient.newConsumer().topic(topicName).replicateSubscriptionState(enabled) + .subscriptionName(subName).subscribe(); + Optional topicOptional = pulsar.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get(); + PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + assertNotNull(persistentSubscription); + assertNull(persistentSubscription.getReplicatedControlled()); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java index 3175a33ced5b8..34682d540ac28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -35,6 +36,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Test(groups = "broker-api") public class ReplicateSubscriptionTest extends ProducerConsumerBase { @BeforeClass @@ -53,6 +55,9 @@ protected void cleanup() throws Exception { @Override protected void doInitConf() throws Exception { super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + conf.setEnableReplicatedSubscriptions(true); } @DataProvider @@ -93,4 +98,123 @@ public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) return true; }); } + + @DataProvider + public Object[][] replicateSubscriptionStateMultipleLevel() { + return new Object[][]{ + // consumer level high priority. + {Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, true}, + {Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, false}, + {Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, true}, + {Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, false}, + {Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, true}, + {Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, false}, + + // namespace level high priority + {null, Boolean.TRUE, null, true}, + {null, Boolean.TRUE, null, false}, + {null, Boolean.FALSE, null, true}, + {null, Boolean.FALSE, null, false}, + + // topic level high priority. + {null, Boolean.TRUE, Boolean.TRUE, true}, + {null, Boolean.TRUE, Boolean.TRUE, false}, + {null, Boolean.TRUE, Boolean.FALSE, true}, + {null, Boolean.TRUE, Boolean.FALSE, false}, + {null, Boolean.FALSE, Boolean.TRUE, true}, + {null, Boolean.FALSE, Boolean.TRUE, false}, + + // All higher levels are null. + {null, null, null, true}, + {null, null, null, false} + }; + } + + /** + * The priority list is from high to low: consumer/subscription, topic, namespace. + */ + @Test(dataProvider = "replicateSubscriptionStateMultipleLevel") + public void testReplicateSubscriptionStatePriority( + Boolean consumerReplicateSubscriptionState, + Boolean replicateSubscriptionEnabledOnNamespaceLevel, + Boolean replicateSubscriptionEnabledOnTopicLevel, + boolean replicatedSubscriptionStatus + ) throws Exception { + String nsName = "my-property/my-ns-" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + String topic = "persistent://" + nsName + "/" + System.nanoTime(); + String subName = "sub"; + @Cleanup + Consumer ignored = null; + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (consumerReplicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(consumerReplicateSubscriptionState); + } + ignored = consumerBuilder.subscribe(); + + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + Optional topicOptional = topicIfExists.get(); + assertTrue(topicOptional.isPresent()); + Topic topicRef = topicOptional.get(); + Subscription subscription = topicRef.getSubscription(subName); + assertNotNull(subscription); + PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; + + // Verify the consumer level. + assertEquals(persistentSubscription.getReplicatedControlled(), consumerReplicateSubscriptionState); + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState != null && consumerReplicateSubscriptionState); + + // Verify the namespace level. + admin.namespaces().setReplicateSubscriptionState(nsName, replicateSubscriptionEnabledOnNamespaceLevel); + await().untilAsserted(() -> { + assertEquals(admin.namespaces().getReplicateSubscriptionState(nsName), + replicateSubscriptionEnabledOnNamespaceLevel); + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topic, true), + replicateSubscriptionEnabledOnNamespaceLevel); + if (consumerReplicateSubscriptionState == null) { + // Using namespace policy. + assertEquals(persistentSubscription.isReplicated(), replicateSubscriptionEnabledOnNamespaceLevel != null + && replicateSubscriptionEnabledOnNamespaceLevel); + } else { + // Using subscription policy. + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState.booleanValue()); + } + }); + + // Verify the topic level. + admin.topicPolicies().setReplicateSubscriptionState(topic, replicateSubscriptionEnabledOnTopicLevel); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionState(topic, false), + replicateSubscriptionEnabledOnTopicLevel); + Boolean replicateSubscriptionState = admin.topicPolicies().getReplicateSubscriptionState(topic, true); + assertTrue(replicateSubscriptionState == replicateSubscriptionEnabledOnTopicLevel + || replicateSubscriptionState == replicateSubscriptionEnabledOnNamespaceLevel); + if (consumerReplicateSubscriptionState == null) { + if (replicateSubscriptionEnabledOnTopicLevel != null) { + // Using topic policy. + assertEquals(persistentSubscription.isReplicated(), + replicateSubscriptionEnabledOnTopicLevel.booleanValue()); + } else { + // Using namespace policy. + assertEquals(persistentSubscription.isReplicated(), + replicateSubscriptionEnabledOnNamespaceLevel != null + && replicateSubscriptionEnabledOnNamespaceLevel); + } + } else { + // Using subscription policy. + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState.booleanValue()); + } + }); + + // Verify the subscription level takes priority over the topic and namespace level. + admin.topics().setReplicatedSubscriptionStatus(topic, subName, replicatedSubscriptionStatus); + await().untilAsserted(() -> { + assertEquals(persistentSubscription.isReplicated(), replicatedSubscriptionStatus); + }); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 7edba8fbfcd2c..df678203af0d5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -4372,4 +4372,53 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem * @return */ CompletableFuture removeNamespaceResourceGroupAsync(String namespace); + + /** + * Enable or disable subscription replication on a namespace. + * + * @param namespace Namespace name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscription replication.
  • + *
  • false: Disable subscription replication.
  • + *
  • null: Remove config.
  • + *
+ */ + void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable subscription replication on a namespace asynchronously. + * + * @param namespace Namespace name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscription replication.
  • + *
  • false: Disable subscription replication.
  • + *
  • null: Remove config.
  • + *
+ */ + CompletableFuture setReplicateSubscriptionStateAsync(String namespace, Boolean enabled); + + /** + * Get the enabled status of subscription replication on a namespace. + * + * @param namespace Namespace name + * @return true Subscriptions replication is enabled. + * false Subscriptions replication is disabled. + * null Subscriptions replication is not configured. + */ + Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException; + + /** + * Get the enabled status of subscription replication on a namespace asynchronously. + * + * @param namespace Namespace name + * @return A {@link CompletableFuture} that will complete with the replication status: + *
    + *
  • true: Subscriptions replication is enabled.
  • + *
  • false: Subscriptions replication is disabled.
  • + *
  • null: Subscriptions replication is not configured.
  • + *
+ */ + CompletableFuture getReplicateSubscriptionStateAsync(String namespace); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 644406fb25def..191ad8a7a1df3 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -1776,4 +1776,53 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean * @param topic Topic name */ CompletableFuture removeResourceGroupAsync(String topic); + + /** + * Enable or disable subscription replication on a topic. + * + * @param topic Topic name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscription replication.
  • + *
  • false: Disable subscription replication.
  • + *
  • null: Remove config.
  • + *
+ */ + void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable subscription replication on a topic asynchronously. + * + * @param topic Topic name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscription replication.
  • + *
  • false: Disable subscription replication.
  • + *
  • null: Remove config.
  • + *
+ */ + CompletableFuture setReplicateSubscriptionStateAsync(String topic, Boolean enabled); + + /** + * Get the enabled status of subscription replication on a topic. + * + * @param topic Topic name + * @return true Subscriptions replication is enabled. + * false Subscriptions replication is disabled. + * null Subscriptions replication is not configured. + */ + Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get the enabled status of subscription replication on a topic. + * + * @param topic Topic name + * @return A {@link CompletableFuture} that will complete with the replication status: + *
    + *
  • true: Subscriptions replication is enabled.
  • + *
  • false: Subscriptions replication is disabled.
  • + *
  • null: Subscriptions replication is not configured.
  • + *
+ */ + CompletableFuture getReplicateSubscriptionStateAsync(String topic, boolean applied); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ebcd7fc09c19e..68eb1c813faae 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -124,6 +124,9 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public String resource_group_name = null; + @SuppressWarnings("checkstyle:MemberName") + public Boolean replicate_subscription_state; + public enum BundleType { LARGEST, HOT; } @@ -151,7 +154,8 @@ public int hashCode() { offload_policies, subscription_types_enabled, properties, - resource_group_name); + resource_group_name, + replicate_subscription_state); } @Override @@ -196,7 +200,8 @@ public boolean equals(Object obj) { && Objects.equals(offload_policies, other.offload_policies) && Objects.equals(subscription_types_enabled, other.subscription_types_enabled) && Objects.equals(properties, other.properties) - && Objects.equals(resource_group_name, other.resource_group_name); + && Objects.equals(resource_group_name, other.resource_group_name) + && Objects.equals(replicate_subscription_state, other.replicate_subscription_state); } return false; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 9b9f1f073f9ab..525516ecc6bae 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -2514,4 +2514,41 @@ private WebTarget namespacePath(NamespaceName namespace, String... parts) { namespacePath = WebTargets.addParts(namespacePath, parts); return namespacePath; } + + @Override + public void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException { + sync(() -> setReplicateSubscriptionStateAsync(namespace, enabled)); + } + + @Override + public CompletableFuture setReplicateSubscriptionStateAsync(String namespace, Boolean enabled) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicateSubscriptionState"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + + @Override + public Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException { + return sync(() -> getReplicateSubscriptionStateAsync(namespace)); + } + + @Override + public CompletableFuture getReplicateSubscriptionStateAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicateSubscriptionState"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index 60b49507e33fb..de1e347846d77 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -1468,6 +1468,45 @@ public CompletableFuture removeResourceGroupAsync(String topic) { return setResourceGroupAsync(topic, null); } + @Override + public void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException { + sync(() -> setReplicateSubscriptionStateAsync(topic, enabled)); + } + + @Override + public CompletableFuture setReplicateSubscriptionStateAsync(String topic, Boolean enabled) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "replicateSubscriptionState"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + + @Override + public Boolean getReplicateSubscriptionState(String topic, boolean applied) + throws PulsarAdminException { + return sync(() -> getReplicateSubscriptionStateAsync(topic, applied)); + } + + @Override + public CompletableFuture getReplicateSubscriptionStateAsync(String topic, boolean applied) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "replicateSubscriptionState"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 7e72117a66fe6..f36cec5f38d9f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2515,6 +2515,46 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set replicate subscription state from a namespace") + private class SetReplicateSubscriptionState extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter(names = "--enabled", arity = 1, required = true, description = "Whether to replicate subscription" + + " state") + private boolean enabled = true; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().setReplicateSubscriptionState(namespace, enabled); + } + } + + @Parameters(commandDescription = "Get replicate subscription state from a namespace") + private class GetReplicateSubscriptionState extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(getAdmin().namespaces().getReplicateSubscriptionState(namespace)); + } + } + + @Parameters(commandDescription = "Remove replicate subscription state from a namespace") + private class RemoveReplicateSubscriptionState extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().setReplicateSubscriptionState(namespace, null); + } + } + public CmdNamespaces(Supplier admin) { super("namespaces", admin); jcommander.addCommand("list", new GetNamespacesPerProperty()); @@ -2694,5 +2734,9 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("get-resource-group", new GetResourceGroup()); jcommander.addCommand("set-resource-group", new SetResourceGroup()); jcommander.addCommand("remove-resource-group", new RemoveResourceGroup()); + + jcommander.addCommand("set-replicate-subscription-state", new SetReplicateSubscriptionState()); + jcommander.addCommand("get-replicate-subscription-state", new GetReplicateSubscriptionState()); + jcommander.addCommand("remove-replicate-subscription-state", new RemoveReplicateSubscriptionState()); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index f718ba7877138..61fbe9f646828 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -149,6 +149,10 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("set-resource-group", new SetResourceGroup()); jcommander.addCommand("get-resource-group", new GetResourceGroup()); jcommander.addCommand("remove-resource-group", new RemoveResourceGroup()); + + jcommander.addCommand("set-replicate-subscription-state", new SetReplicateSubscriptionState()); + jcommander.addCommand("get-replicate-subscription-state", new GetReplicateSubscriptionState()); + jcommander.addCommand("remove-replicate-subscription-state", new RemoveReplicateSubscriptionState()); } @Parameters(commandDescription = "Get max consumers per subscription for a topic") @@ -1801,6 +1805,61 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set replicate subscription state from a topic") + private class SetReplicateSubscriptionState extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = "--enabled", arity = 1, required = true, description = "Whether to replicate subscription" + + " state") + private boolean enabled = true; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).setReplicateSubscriptionState(topic, enabled); + } + } + + @Parameters(commandDescription = "Get replicate subscription state from a topic") + private class GetReplicateSubscriptionState extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = {"--applied", "-a"}, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + print(getTopicPolicies(isGlobal).getReplicateSubscriptionState(topic, applied)); + } + } + + @Parameters(commandDescription = "Remove replicate subscription state from a topic") + private class RemoveReplicateSubscriptionState extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).setReplicateSubscriptionState(topic, null); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 53b119c039c99..dda6cb5d018fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -57,7 +57,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue subscriptionDispatchRate; final PolicyHierarchyValue schemaCompatibilityStrategy; final PolicyHierarchyValue dispatchRate; - + final PolicyHierarchyValue replicateSubscriptionState; final PolicyHierarchyValue resourceGroupName; public HierarchyTopicPolicies() { @@ -89,5 +89,6 @@ public HierarchyTopicPolicies() { schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); resourceGroupName = new PolicyHierarchyValue<>(); + replicateSubscriptionState = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index 2b115023f9d21..42d8ecff22224 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -50,5 +50,6 @@ public enum PolicyName { ENCRYPTION, TTL, MAX_TOPICS, - RESOURCEGROUP + RESOURCEGROUP, + REPLICATED_SUBSCRIPTION, } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 2ec4b18a1dbe1..b5f452b3bd97b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -72,7 +72,7 @@ public class TopicPolicies { private DispatchRateImpl replicatorDispatchRate; private SchemaCompatibilityStrategy schemaCompatibilityStrategy; private String resourceGroupName; - + private Boolean replicateSubscriptionState; public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ReplicateSubscriptionStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ReplicateSubscriptionStateTest.java new file mode 100644 index 0000000000000..00dff104156cc --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ReplicateSubscriptionStateTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.tests.integration.cli; + +import static org.awaitility.Awaitility.await; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarCliTestSuite; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ReplicateSubscriptionStateTest extends PulsarCliTestSuite { + @BeforeClass(alwaysRun = true) + @Override + public void before() throws Exception { + enableTopicPolicies(); + super.before(); + } + + @BeforeClass(alwaysRun = true) + @Override + public void after() throws Exception { + super.after(); + } + + @Test + public void testReplicateSubscriptionStateCmd() throws Exception { + TopicName topicName = TopicName.get(generateTopicName("testReplicateSubscriptionState", true)); + pulsarAdmin.topics().createNonPartitionedTopic(topicName.toString()); + + String topicNameString = topicName.toString(); + String namesapceNameString = topicName.getNamespace(); + + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "get-replicate-subscription-state", namesapceNameString); + assertEquals(result.getStdout().trim(), "null"); + assertEquals(result.getExitCode(), 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "get-schema-compatibility-strategy", topicNameString); + assertEquals(result.getStdout().trim(), "null"); + assertEquals(result.getExitCode(), 0); + + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-replicate-subscription-state", "--enabled", "true", namesapceNameString); + assertEquals(result.getExitCode(), 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "get-replicate-subscription-state", namesapceNameString); + assertEquals(result.getExitCode(), 0); + assertEquals(result.getStdout().trim(), "true"); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-replicate-subscription-state", "--enabled", "false", namesapceNameString); + assertEquals(result.getExitCode(), 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "get-replicate-subscription-state", namesapceNameString); + assertEquals(result.getExitCode(), 0); + assertEquals(result.getStdout().trim(), "false"); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "remove-replicate-subscription-state", namesapceNameString); + assertEquals(result.getExitCode(), 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "get-replicate-subscription-state", namesapceNameString); + assertEquals(result.getExitCode(), 0); + assertEquals(result.getStdout().trim(), "null"); + + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + "--applied", topicNameString); + assertEquals(result.getExitCode(), 0); + assertEquals(result.getStdout().trim(), "null"); + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + topicNameString); + assertEquals(result.getExitCode(), 0); + assertEquals(result.getStdout().trim(), "null"); + + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "set-replicate-subscription-state", "--enabled", "false", topicNameString); + assertEquals(result.getExitCode(), 0); + await().untilAsserted(() -> { + ContainerExecResult r = + pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + topicNameString); + assertEquals(r.getExitCode(), 0); + assertEquals(r.getStdout().trim(), "false"); + }); + await().untilAsserted(() -> { + ContainerExecResult r = + pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + "--applied", + topicNameString); + assertEquals(r.getExitCode(), 0); + assertEquals(r.getStdout().trim(), "false"); + }); + + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "set-replicate-subscription-state --enabled true", topicNameString); + assertEquals(result.getExitCode(), 0); + await().untilAsserted(() -> { + ContainerExecResult r = + pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + topicNameString); + assertEquals(r.getExitCode(), 0); + assertEquals(r.getStdout().trim(), "true"); + }); + await().untilAsserted(() -> { + ContainerExecResult r = + pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + "--applied", + topicNameString); + assertEquals(r.getExitCode(), 0); + assertEquals(r.getStdout().trim(), "true"); + }); + + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "remove-replicate-subscription-state", topicNameString); + assertEquals(result.getExitCode(), 0); + await().untilAsserted(() -> { + ContainerExecResult r = + pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", "get-replicate-subscription-state", + topicNameString); + assertEquals(r.getExitCode(), 0); + assertEquals(r.getStdout().trim(), "null"); + }); + } + + @Test + public void testReplicateSubscriptionStateCmdWithInvalidParameters() throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-replicate-subscription-state", "public/default"); + assertTrue(result.getExitCode() != 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-replicate-subscription-state --enabled", "public/default"); + assertTrue(result.getExitCode() != 0); + + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "set-replicate-subscription-state", "public/default/test"); + assertTrue(result.getExitCode() != 0); + result = pulsarCluster.runAdminCommandOnAnyBroker("topicPolicies", + "set-replicate-subscription-state --enabled", "public/default/test"); + assertTrue(result.getExitCode() != 0); + } +} \ No newline at end of file diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml index 70b4136f2ca76..eadecae10169c 100644 --- a/tests/integration/src/test/resources/pulsar-cli.xml +++ b/tests/integration/src/test/resources/pulsar-cli.xml @@ -31,6 +31,7 @@ +