From 843652371c869ff7cf3a6e5012584f8ca5081ffb Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 10 Dec 2024 18:25:10 +0800 Subject: [PATCH] feat: add replicateSubscriptionsEnabled for namespace and topic Signed-off-by: Zixuan Liu --- .../broker/admin/impl/NamespacesBase.java | 9 + .../admin/impl/PersistentTopicsBase.java | 23 ++ .../pulsar/broker/admin/v2/Namespaces.java | 30 +++ .../broker/admin/v2/PersistentTopics.java | 79 +++++++ .../pulsar/broker/service/AbstractTopic.java | 5 + .../pulsar/broker/service/ServerCnx.java | 9 +- .../broker/service/SubscriptionOption.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 2 +- .../persistent/PersistentSubscription.java | 27 ++- .../service/persistent/PersistentTopic.java | 27 ++- ...nApiReplicateSubscriptionsEnabledTest.java | 157 +++++++++++++ ...onsumerReplicateSubscriptionStateTest.java | 86 +++++++ .../client/api/ReplicateSubscriptionTest.java | 216 ++++++++++++++++++ .../pulsar/client/admin/Namespaces.java | 49 ++++ .../pulsar/client/admin/TopicPolicies.java | 49 ++++ .../pulsar/common/policies/data/Policies.java | 9 +- .../client/admin/internal/NamespacesImpl.java | 37 +++ .../admin/internal/TopicPoliciesImpl.java | 39 ++++ .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../impl/conf/ConsumerConfigurationData.java | 10 +- .../policies/data/HierarchyTopicPolicies.java | 3 +- .../common/policies/data/PolicyName.java | 3 +- .../common/policies/data/TopicPolicies.java | 2 +- .../pulsar/common/protocol/Commands.java | 7 +- 24 files changed, 851 insertions(+), 31 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionsEnabledTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9c859238249c98..280df5119858a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2922,6 +2922,15 @@ protected void internalSetNamespaceResourceGroup(String rgName) { internalSetPolicies("resource_group_name", rgName); } + protected void internalSetReplicateSubscriptionsEnabled(Boolean enabled) { + validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATED_SUBSCRIPTION, + PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + updatePolicies(namespaceName, policies -> { + policies.replicate_subscriptions_enabled = enabled; + return policies; + }); + } private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 09507fa73e608d..6935e1e26bf69e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5016,6 +5016,29 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author } } + protected CompletableFuture internalSetEnableReplicatedSubscription(Boolean enabled, + boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicateSubscriptionEnabled(enabled); + topicPolicies.setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + } + + protected CompletableFuture internalGetReplicateSubscriptionsEnabled(boolean applied, + boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getReplicateSubscriptionEnabled) + .orElseGet(() -> { + if (applied) { + return getNamespacePolicies(namespaceName).replicate_subscriptions_enabled; + } + return null; + })); + } + protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) { log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index fbe0918b8a8363..d0865799fb41a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1946,5 +1946,35 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant, internalSetNamespaceResourceGroup(null); } + @GET + @Path("/{tenant}/{namespace}/replicateSubscriptionsEnabled") + @ApiOperation(value = "Get the enabled status of subscriptions replication on a namespace.", response = + Boolean.class) + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public Boolean getReplicateSubscriptionsEnabled(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), + PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.READ); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.replicate_subscriptions_enabled; + } + + @POST + @Path("/{tenant}/{namespace}/replicateSubscriptionsEnabled") + @ApiOperation(value = "Enable or disable subscriptions replication on a namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public void setReplicateSubscriptionsEnabled(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "Whether to enable subscriptions replication", + required = true) + Boolean enabled) { + validateNamespaceName(tenant, namespace); + internalSetReplicateSubscriptionsEnabled(enabled); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1c97821c4cb019..245197666b30ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3896,6 +3896,85 @@ public void getReplicatedSubscriptionStatus( internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative); } + @POST + @Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionsEnabled") + @ApiOperation(value = "Enable or disable subscriptions replication on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or " + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on this topic"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void setReplicateSubscriptionsEnabled( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Whether to enable subscriptions replication", required = true) + Boolean enabled) { + validateTopicName(tenant, namespace, encodedTopic); + validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS) + .thenCompose(__ -> preValidation(authoritative)) + .thenCompose(__ -> internalSetEnableReplicatedSubscription(enabled, isGlobal)) + .thenRun(() -> { + log.info( + "[{}] Successfully set topic replicated subscription enabled: tenant={}, namespace={}, " + + "topic={}, isGlobal={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + isGlobal); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setReplicateSubscriptionsEnabled", ex, asyncResponse); + return null; + }); + } + + @GET + @Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionsEnabled") + @ApiOperation(value = "Get the enabled status of subscriptions replication on a topic.") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have permission to administrate resources"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 412, message = "Can't find owner for topic"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void getReplicateSubscriptionsEnabled( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS) + .thenCompose(__ -> preValidation(authoritative)) + .thenCompose(__ -> internalGetReplicateSubscriptionsEnabled(applied, isGlobal)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getReplicateSubscriptionsEnabled", ex, asyncResponse); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy") @ApiOperation(value = "Get schema compatibility strategy on a topic") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9b20a57506c98c..95942cdce857dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -201,6 +201,7 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters()); topicPolicies.getSchemaCompatibilityStrategy() .updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy())); + topicPolicies.getReplicateSubscriptionsEnabled().updateTopicValue(data.getReplicateSubscriptionEnabled()); } topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies()); topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic()); @@ -278,6 +279,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); + if (!isSystemTopic()) { + topicPolicies.getReplicateSubscriptionsEnabled() + .updateNamespaceValue(namespacePolicies.replicate_subscriptions_enabled); + } } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index aafcb5eeb486df..943b1aff074207 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -219,6 +219,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long connectionLivenessCheckTimeoutMillis = 5000; + private final boolean ignoreConsumerReplicateSubscriptionState; + // Number of bytes pending to be published from a single specific IO thread. private static final FastThreadLocal pendingBytesPerThread = new FastThreadLocal() { @Override @@ -281,6 +283,9 @@ public ServerCnx(PulsarService pulsar, String listenerName) { this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2; this.connectionController = new ConnectionController.DefaultConnectionController(conf); this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; + this.ignoreConsumerReplicateSubscriptionState = + Boolean.parseBoolean( + conf.getProperties().getProperty("ignoreConsumerReplicateSubscriptionState", "false")); } @Override @@ -984,8 +989,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; - final boolean isReplicated = subscribe.hasReplicateSubscriptionState() - && subscribe.isReplicateSubscriptionState(); + final Boolean isReplicated = ignoreConsumerReplicateSubscriptionState ? null : + (subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null); final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index d7503f6e90d4aa..8d0e7ab32803ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -45,7 +45,7 @@ public class SubscriptionOption { private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; private long startMessageRollbackDurationSec; - private boolean replicatedSubscriptionStateArg; + private Boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 62ccc3a259586f..f334f92a3a117d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -246,7 +246,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), - option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), + option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 937e9ba1fd1b92..7cbdb0c2e90481 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -115,13 +115,15 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; private volatile CompletableFuture fenceFuture; + private volatile Boolean replicatedControlled; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } - static Map getBaseCursorProperties(boolean isReplicated) { - return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; + static Map getBaseCursorProperties(Boolean isReplicated) { + return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : + NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; } static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { @@ -129,19 +131,22 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated) { + Boolean replicated) { this(topic, subscriptionName, cursor, replicated, Collections.emptyMap()); } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); - this.setReplicated(replicated); + this.replicatedControlled = replicated; + if (this.replicatedControlled != null && this.replicatedControlled) { + this.setReplicated(true); + } this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() @@ -179,6 +184,11 @@ public boolean isReplicated() { } public boolean setReplicated(boolean replicated) { + replicatedControlled = replicated; + return setReplicated(replicated, true); + } + + public boolean setReplicated(boolean replicated, boolean isPersistent) { ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -188,7 +198,7 @@ public boolean setReplicated(boolean replicated) { config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); } - if (this.cursor != null) { + if (this.cursor != null && isPersistent) { if (replicated) { if (!config.isEnableReplicatedSubscriptions()) { log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the " @@ -1230,4 +1240,9 @@ public boolean checkIfPendingAckStoreInit() { public PendingAckHandle getPendingAckHandle() { return pendingAckHandle; } + + @VisibleForTesting + public Boolean getReplicatedControlled() { + return replicatedControlled; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 14c77e77e02eac..8e550b6e447a28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -277,7 +277,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true: null, cursor.getCursorProperties())); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now @@ -396,7 +396,7 @@ private CompletableFuture removeOrphanReplicationCursors() { } private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { checkNotNull(compactedTopic); if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); @@ -685,7 +685,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), - option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), + option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch()); } @@ -696,7 +696,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicatedSubscriptionStateArg, + Boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch) { @@ -706,12 +706,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { - boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - - if (replicatedSubscriptionState + if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); - replicatedSubscriptionState = false; } if (subType == SubType.Key_Shared @@ -779,7 +776,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture subscriptionFuture = isDurable ? // getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState, subscriptionProperties) + replicatedSubscriptionStateArg, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); @@ -865,7 +862,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs } private CompletableFuture getDurableSubscription(String subscriptionName, - InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, + InitialPosition initialPosition, long startMessageRollbackDurationSec, Boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { @@ -897,7 +894,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (replicated && !subscription.isReplicated()) { + if (replicated !=null && replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); } @@ -2933,8 +2930,16 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem } public synchronized void checkReplicatedSubscriptionControllerState() { + Boolean replicatedSubscriptionStatus = topicPolicies.getReplicateSubscriptionsEnabled().get(); AtomicBoolean shouldBeEnabled = new AtomicBoolean(false); subscriptions.forEach((name, subscription) -> { + // If the subscription does not have a replicated flag configured, apply the topic policies to the + // subscription. + // DO NOT override the replicated flag if it has been explicitly set by the client or admin API. + if (subscription.getReplicatedControlled() == null + && !PersistentSubscription.isCursorFromReplicatedSubscription(subscription.getCursor())) { + subscription.setReplicated(replicatedSubscriptionStatus != null && replicatedSubscriptionStatus, false); + } if (subscription.isReplicated()) { shouldBeEnabled.set(true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionsEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionsEnabledTest.java new file mode 100644 index 00000000000000..0bbc935dfef944 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiReplicateSubscriptionsEnabledTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.awaitility.Awaitility.await; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-admin") +public class AdminApiReplicateSubscriptionsEnabledTest extends MockedPulsarServiceBaseTest { + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testReplicateSubscriptionsEnabledOnNamespaceLevel() throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionsEnabled" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + + admin.namespaces().setReplicateSubscriptionsEnabled(nsName, true); + assertTrue(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + assertTrue(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + }); + + admin.namespaces().setReplicateSubscriptionsEnabled(nsName, false); + assertFalse(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + await().untilAsserted(() -> { + assertFalse(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + }); + + admin.namespaces().setReplicateSubscriptionsEnabled(nsName, null); + assertNull(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + }); + } + + @Test + public void testReplicateSubscriptionsEnabledOnTopicLevel() throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionsEnabled" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, true); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true), Boolean.TRUE); + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false), Boolean.TRUE); + }); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, false); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true), Boolean.FALSE); + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false), Boolean.FALSE); + }); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, null); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + }); + } + + @DataProvider + Object[] replicateSubscriptionsEnabledPriorityLevelDataProvider() { + return new Object[]{ + true, + false, + null, + }; + } + + @Test(dataProvider = "replicateSubscriptionsEnabledPriorityLevelDataProvider") + public void testReplicateSubscriptionsEnabledPriorityLevel(Boolean enabledOnNamespace) + throws PulsarAdminException { + String nsName = "public/testReplicateSubscriptionsEnabled" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + assertNull(admin.namespaces().getReplicateSubscriptionsEnabled(nsName)); + admin.namespaces().setReplicateSubscriptionsEnabled(nsName, enabledOnNamespace); + + String topicName = nsName + "/topic" + System.nanoTime(); + admin.topics().createNonPartitionedTopic(topicName); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, false); + await().untilAsserted(() -> { + assertFalse(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertFalse(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + }); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, true); + await().untilAsserted(() -> { + assertTrue(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true)); + assertTrue(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + }); + + admin.topicPolicies().setReplicateSubscriptionsEnabled(topicName, null); + await().untilAsserted(() -> { + assertNull(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, false)); + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topicName, true), enabledOnNamespace); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java new file mode 100644 index 00000000000000..b73aaec4bbfa39 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/IgnoreConsumerReplicateSubscriptionStateTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.AssertJUnit.assertTrue; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class IgnoreConsumerReplicateSubscriptionStateTest extends MockedPulsarServiceBaseTest { + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + Properties properties = conf.getProperties(); + if (properties==null) { + properties = new Properties(); + } + properties.put("ignoreConsumerReplicateSubscriptionState", "true"); + } + + @DataProvider + Object[] replicateSubscriptionState(){ + return new Object[]{true,false}; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void ignoreConsumerReplicateSubscriptionState(boolean enabled) + throws PulsarClientException, ExecutionException, InterruptedException { + String topicName = "topic-" + System.nanoTime(); + String subName = "sub-" + System.nanoTime(); + @Cleanup Consumer ignored = + pulsarClient.newConsumer().topic(topicName).replicateSubscriptionState(enabled) + .subscriptionName(subName).subscribe(); + Optional topicOptional = pulsar.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get(); + PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + assertNotNull(persistentSubscription); + assertNull(persistentSubscription.getReplicatedControlled()); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java new file mode 100644 index 00000000000000..06047e99d7ce31 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.awaitility.Awaitility.await; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class ReplicateSubscriptionTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setManagedLedgerCacheEvictionIntervalMs(10000); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + conf.setEnableReplicatedSubscriptions(true); + } + + @DataProvider + public Object[] replicateSubscriptionState() { + return new Object[]{ + Boolean.TRUE, + Boolean.FALSE, + null + }; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void testReplicateSubscriptionStateByConsumerBuilder(Boolean replicateSubscriptionState) { + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String subName = "sub-"+ System.nanoTime(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (replicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(replicateSubscriptionState); + } + ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder; + assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState); + } + + @Test(dataProvider = "replicateSubscriptionState") + public void testReplicateSubscriptionStateNotNullByConsumerBuilder(Boolean replicateSubscriptionState) { + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String subName = "sub-"+ System.nanoTime(); + boolean enabled = replicateSubscriptionState != null && replicateSubscriptionState; + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .replicateSubscriptionState(enabled) + .subscriptionName(subName); + ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder; + assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState().booleanValue(), enabled); + } + + @DataProvider + public Object[][] replicateSubscriptionStateMultipleLevel() { + return new Object[][]{ + // consumer level high priority. + {Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, true}, + {Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, false}, + {Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, true}, + {Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, false}, + {Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, true}, + {Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, false}, + + // namespace level high priority + {null, Boolean.TRUE, null, true}, + {null, Boolean.TRUE, null, false}, + {null, Boolean.FALSE, null, true}, + {null, Boolean.FALSE, null, false}, + + // topic level high priority. + {null, Boolean.TRUE, Boolean.TRUE, true}, + {null, Boolean.TRUE, Boolean.TRUE, false}, + {null, Boolean.TRUE, Boolean.FALSE, true}, + {null, Boolean.TRUE, Boolean.FALSE, false}, + {null, Boolean.FALSE, Boolean.TRUE, true}, + {null, Boolean.FALSE, Boolean.TRUE, false}, + + // All higher levels are null. + {null, null, null, true}, + {null, null, null, false} + }; + } + + /** + * The priority list is from high to low: consumer/subscription, topic, namespace. + */ + @Test(dataProvider = "replicateSubscriptionStateMultipleLevel") + public void testReplicateSubscriptionStateByConsumerAndAdminAPI( + Boolean consumerReplicateSubscriptionState, + Boolean replicateSubscriptionEnabledOnNamespaceLevel, + Boolean replicateSubscriptionEnabledOnTopicLevel, + boolean replicatedSubscriptionStatus + ) throws Exception { + String nsName = "my-property/my-ns-" + System.nanoTime(); + admin.namespaces().createNamespace(nsName); + String topic = "persistent://" + nsName + "/" + System.nanoTime(); + String subName = "sub"; + @Cleanup + Consumer ignored = null; + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (consumerReplicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(consumerReplicateSubscriptionState); + } + ignored = consumerBuilder.subscribe(); + + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + Optional topicOptional = topicIfExists.get(); + assertTrue(topicOptional.isPresent()); + Topic topicRef = topicOptional.get(); + Subscription subscription = topicRef.getSubscription(subName); + assertNotNull(subscription); + PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; + + // Verify the consumer level. + assertEquals(persistentSubscription.getReplicatedControlled(), consumerReplicateSubscriptionState); + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState != null && consumerReplicateSubscriptionState); + + // Verify the namespace level. + admin.namespaces().setReplicateSubscriptionsEnabled(nsName, replicateSubscriptionEnabledOnNamespaceLevel); + await().untilAsserted(() -> { + assertEquals(admin.namespaces().getReplicateSubscriptionsEnabled(nsName), + replicateSubscriptionEnabledOnNamespaceLevel); + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topic, true), + replicateSubscriptionEnabledOnNamespaceLevel); + if (consumerReplicateSubscriptionState == null) { + // Using namespace policy. + assertEquals(persistentSubscription.isReplicated(), replicateSubscriptionEnabledOnNamespaceLevel != null + && replicateSubscriptionEnabledOnNamespaceLevel); + } else { + // Using subscription policy. + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState.booleanValue()); + } + }); + + // Verify the topic level. + admin.topicPolicies().setReplicateSubscriptionsEnabled(topic, replicateSubscriptionEnabledOnTopicLevel); + await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getReplicateSubscriptionsEnabled(topic, false), + replicateSubscriptionEnabledOnTopicLevel); + Boolean replicateSubscriptionsEnabled = admin.topicPolicies().getReplicateSubscriptionsEnabled(topic, true); + assertTrue(replicateSubscriptionsEnabled == replicateSubscriptionEnabledOnTopicLevel + || replicateSubscriptionsEnabled == replicateSubscriptionEnabledOnNamespaceLevel); + if (consumerReplicateSubscriptionState == null) { + if (replicateSubscriptionEnabledOnTopicLevel != null) { + // Using topic policy. + assertEquals(persistentSubscription.isReplicated(), + replicateSubscriptionEnabledOnTopicLevel.booleanValue()); + } else { + // Using namespace policy. + assertEquals(persistentSubscription.isReplicated(), + replicateSubscriptionEnabledOnNamespaceLevel != null + && replicateSubscriptionEnabledOnNamespaceLevel); + } + } else { + // Using subscription policy. + assertEquals(persistentSubscription.isReplicated(), + consumerReplicateSubscriptionState.booleanValue()); + } + }); + + // Verify the subscription level takes priority over the topic and namespace level. + admin.topics().setReplicatedSubscriptionStatus(topic, subName, replicatedSubscriptionStatus); + await().untilAsserted(() -> { + assertEquals(persistentSubscription.isReplicated(), replicatedSubscriptionStatus); + }); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 7edba8fbfcd2c5..f54ca62acbc259 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -4372,4 +4372,53 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem * @return */ CompletableFuture removeNamespaceResourceGroupAsync(String namespace); + + /** + * Enable or disable subscriptions replication on a namespace. + * + * @param namespace Namespace name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscriptions replication.
  • + *
  • false: Disable subscriptions replication.
  • + *
  • null: Remove config.
  • + *
+ */ + void setReplicateSubscriptionsEnabled(String namespace, Boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable subscriptions replication on a namespace asynchronously. + * + * @param namespace Namespace name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscriptions replication.
  • + *
  • false: Disable subscriptions replication.
  • + *
  • null: Remove config.
  • + *
+ */ + CompletableFuture setReplicateSubscriptionsEnabledAsync(String namespace, Boolean enabled); + + /** + * Get the enabled status of subscriptions replication on a namespace. + * + * @param namespace Namespace name + * @return true Subscriptions replication is enabled. + * false Subscriptions replication is disabled. + * null Subscriptions replication is not configured. + */ + Boolean getReplicateSubscriptionsEnabled(String namespace) throws PulsarAdminException; + + /** + * Get the enabled status of subscriptions replication on a namespace asynchronously. + * + * @param namespace Namespace name + * @return A {@link CompletableFuture} that will complete with the replication status: + *
    + *
  • true: Subscriptions replication is enabled.
  • + *
  • false: Subscriptions replication is disabled.
  • + *
  • null: Subscriptions replication is not configured.
  • + *
+ */ + CompletableFuture getReplicateSubscriptionsEnabledAsync(String namespace); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 644406fb25def4..c5c19de6dd1a28 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -1776,4 +1776,53 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean * @param topic Topic name */ CompletableFuture removeResourceGroupAsync(String topic); + + /** + * Enable or disable subscriptions replication on a topic. + * + * @param topic Topic name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscriptions replication.
  • + *
  • false: Disable subscriptions replication.
  • + *
  • null: Remove config.
  • + *
+ */ + void setReplicateSubscriptionsEnabled(String topic, Boolean enabled) throws PulsarAdminException; + + /** + * Enable or disable subscriptions replication on a topic asynchronously. + * + * @param topic Topic name + * @param enabled The replication status to set: + *
    + *
  • true: Enable subscriptions replication.
  • + *
  • false: Disable subscriptions replication.
  • + *
  • null: Remove config.
  • + *
+ */ + CompletableFuture setReplicateSubscriptionsEnabledAsync(String topic, Boolean enabled); + + /** + * Get the enabled status of subscriptions replication on a topic. + * + * @param topic Topic name + * @return true Subscriptions replication is enabled. + * false Subscriptions replication is disabled. + * null Subscriptions replication is not configured. + */ + Boolean getReplicateSubscriptionsEnabled(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get the enabled status of subscriptions replication on a topic. + * + * @param topic Topic name + * @return A {@link CompletableFuture} that will complete with the replication status: + *
    + *
  • true: Subscriptions replication is enabled.
  • + *
  • false: Subscriptions replication is disabled.
  • + *
  • null: Subscriptions replication is not configured.
  • + *
+ */ + CompletableFuture getReplicateSubscriptionsEnabledAsync(String topic, boolean applied); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ebcd7fc09c19e6..8f848be47bf869 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -124,6 +124,9 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public String resource_group_name = null; + @SuppressWarnings("checkstyle:MemberName") + public Boolean replicate_subscriptions_enabled; + public enum BundleType { LARGEST, HOT; } @@ -151,7 +154,8 @@ public int hashCode() { offload_policies, subscription_types_enabled, properties, - resource_group_name); + resource_group_name, + replicate_subscriptions_enabled); } @Override @@ -196,7 +200,8 @@ public boolean equals(Object obj) { && Objects.equals(offload_policies, other.offload_policies) && Objects.equals(subscription_types_enabled, other.subscription_types_enabled) && Objects.equals(properties, other.properties) - && Objects.equals(resource_group_name, other.resource_group_name); + && Objects.equals(resource_group_name, other.resource_group_name) + && Objects.equals(replicate_subscriptions_enabled, other.replicate_subscriptions_enabled); } return false; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 9b9f1f073f9ab9..fddd80793314be 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -2514,4 +2514,41 @@ private WebTarget namespacePath(NamespaceName namespace, String... parts) { namespacePath = WebTargets.addParts(namespacePath, parts); return namespacePath; } + + @Override + public void setReplicateSubscriptionsEnabled(String namespace, Boolean enabled) throws PulsarAdminException { + sync(() -> setReplicateSubscriptionsEnabledAsync(namespace, enabled)); + } + + @Override + public CompletableFuture setReplicateSubscriptionsEnabledAsync(String namespace, Boolean enabled) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicateSubscriptionsEnabled"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + + @Override + public Boolean getReplicateSubscriptionsEnabled(String namespace) throws PulsarAdminException { + return sync(() -> getReplicateSubscriptionsEnabledAsync(namespace)); + } + + @Override + public CompletableFuture getReplicateSubscriptionsEnabledAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicateSubscriptionsEnabled"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index 60b49507e33fb3..8617cc6be30b1a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -1468,6 +1468,45 @@ public CompletableFuture removeResourceGroupAsync(String topic) { return setResourceGroupAsync(topic, null); } + @Override + public void setReplicateSubscriptionsEnabled(String topic, Boolean enabled) throws PulsarAdminException { + sync(() -> setReplicateSubscriptionsEnabledAsync(topic, enabled)); + } + + @Override + public CompletableFuture setReplicateSubscriptionsEnabledAsync(String topic, Boolean enabled) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "replicateSubscriptionsEnabled"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + + @Override + public Boolean getReplicateSubscriptionsEnabled(String topic, boolean applied) + throws PulsarAdminException { + return sync(() -> getReplicateSubscriptionsEnabledAsync(topic, applied)); + } + + @Override + public CompletableFuture getReplicateSubscriptionsEnabledAsync(String topic, boolean applied) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "replicateSubscriptionsEnabled"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 90ac520dce6c08..ae07aac01ea468 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -851,7 +851,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, - conf.isReplicateSubscriptionState(), + conf.getReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index ab2527b4932365..bb0af71fb91601 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -145,7 +145,7 @@ public int getMaxPendingChuckedMessage() { private long autoUpdatePartitionsIntervalSeconds = 60; - private boolean replicateSubscriptionState = false; + private Boolean replicateSubscriptionState; private boolean resetIncludeHead = false; @@ -185,4 +185,12 @@ public ConsumerConfigurationData clone() { throw new RuntimeException("Failed to clone ConsumerConfigurationData"); } } + + /** + * @deprecated Using {@link #getReplicateSubscriptionState()} instead. + */ + @Deprecated + public boolean isReplicateSubscriptionState() { + return replicateSubscriptionState != null && replicateSubscriptionState; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 53b119c039c99b..5d2337d67d4548 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -57,8 +57,8 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue subscriptionDispatchRate; final PolicyHierarchyValue schemaCompatibilityStrategy; final PolicyHierarchyValue dispatchRate; - final PolicyHierarchyValue resourceGroupName; + final PolicyHierarchyValue replicateSubscriptionsEnabled; public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); @@ -89,5 +89,6 @@ public HierarchyTopicPolicies() { schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); resourceGroupName = new PolicyHierarchyValue<>(); + replicateSubscriptionsEnabled = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index 2b115023f9d21d..42d8ecff222240 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -50,5 +50,6 @@ public enum PolicyName { ENCRYPTION, TTL, MAX_TOPICS, - RESOURCEGROUP + RESOURCEGROUP, + REPLICATED_SUBSCRIPTION, } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 2ec4b18a1dbe11..d84bbb0073f231 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -72,7 +72,7 @@ public class TopicPolicies { private DispatchRateImpl replicatorDispatchRate; private SchemaCompatibilityStrategy schemaCompatibilityStrategy; private String resourceGroupName; - + private Boolean replicateSubscriptionEnabled; public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index dadaa56ae603d0..da07c82b58df1c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -562,7 +562,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map subscriptionProperties, long consumerEpoch) { @@ -578,10 +578,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setDurable(isDurable) .setReadCompacted(readCompacted) .setInitialPosition(subscriptionInitialPosition) - .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) .setConsumerEpoch(consumerEpoch); - + if (isReplicated != null) { + subscribe.setReplicateSubscriptionState(isReplicated); + } if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>(); subscriptionProperties.forEach((key, value) -> {