Skip to content

Commit

Permalink
[feat][broker] Add replicateSubscriptionsEnabled for namespace and topic
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Dec 19, 2024
1 parent 0ff335c commit c80c0cb
Show file tree
Hide file tree
Showing 24 changed files with 860 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,15 @@ protected void internalSetNamespaceResourceGroup(String rgName) {
internalSetPolicies("resource_group_name", rgName);
}

protected CompletableFuture<Void> internalSetReplicateSubscriptionsEnabledAsync(Boolean enabled) {
return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.WRITE))
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.replicate_subscriptions_enabled = enabled;
return policies;
}));
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5016,6 +5016,29 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
}
}

protected CompletableFuture<Void> internalSetEnableReplicatedSubscription(Boolean enabled,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicateSubscriptionEnabled(enabled);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Boolean> internalGetReplicateSubscriptionsEnabled(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicateSubscriptionEnabled)
.orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).replicate_subscriptions_enabled;
}
return null;
}));
}

protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
boolean authoritative, boolean enabled) {
log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -1946,5 +1947,42 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
internalSetNamespaceResourceGroup(null);
}

@GET
@Path("/{tenant}/{namespace}/replicateSubscriptionsEnabled")
@ApiOperation(value = "Get the enabled status of subscriptions replication on a namespace.", response =
Boolean.class)
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public Boolean getReplicateSubscriptionsEnabled(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace),
PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.replicate_subscriptions_enabled;
}

@POST
@Path("/{tenant}/{namespace}/replicateSubscriptionsEnabled")
@ApiOperation(value = "Enable or disable subscriptions replication on a namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public void setReplicateSubscriptionsEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Whether to enable subscriptions replication",
required = true)
Boolean enabled) {
validateNamespaceName(tenant, namespace);
internalSetReplicateSubscriptionsEnabledAsync(enabled)
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("set replicate subscriptions enabled failed", ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,85 @@ public void getReplicatedSubscriptionStatus(
internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative);
}

@POST
@Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionsEnabled")
@ApiOperation(value = "Enable or disable subscriptions replication on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicateSubscriptionsEnabled(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable subscriptions replication", required = true)
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetEnableReplicatedSubscription(enabled, isGlobal))
.thenRun(() -> {
log.info(
"[{}] Successfully set topic replicated subscription enabled: tenant={}, namespace={}, "
+ "topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
handleTopicPolicyException("setReplicateSubscriptionsEnabled", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionsEnabled")
@ApiOperation(value = "Get the enabled status of subscriptions replication on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error")})
public void getReplicateSubscriptionsEnabled(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicateSubscriptionsEnabled(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicateSubscriptionsEnabled", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
@ApiOperation(value = "Get schema compatibility strategy on a topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters());
topicPolicies.getSchemaCompatibilityStrategy()
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
topicPolicies.getReplicateSubscriptionsEnabled().updateTopicValue(data.getReplicateSubscriptionEnabled());
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
Expand Down Expand Up @@ -278,6 +279,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name);
if (!isSystemTopic()) {
topicPolicies.getReplicateSubscriptionsEnabled()
.updateNamespaceValue(namespacePolicies.replicate_subscriptions_enabled);
}
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {

private final long connectionLivenessCheckTimeoutMillis = 5000;

private final boolean ignoreConsumerReplicateSubscriptionState;

// Number of bytes pending to be published from a single specific IO thread.
private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
@Override
Expand Down Expand Up @@ -281,6 +283,9 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
this.connectionController = new ConnectionController.DefaultConnectionController(conf);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.ignoreConsumerReplicateSubscriptionState =
Boolean.parseBoolean(
conf.getProperties().getProperty("ignoreConsumerReplicateSubscriptionState", "false"));
}

@Override
Expand Down Expand Up @@ -984,8 +989,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final Boolean isReplicated = ignoreConsumerReplicateSubscriptionState ? null :
(subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null);
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(), option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
}

Expand All @@ -269,7 +269,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
MessageId startMessageId, Map<String, String> metadata,
boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
Boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile Boolean replicatedControlled;

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
Boolean replicated) {
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.replicatedControlled = replicated;
if (this.replicatedControlled != null) {
this.setReplicated(this.replicatedControlled);
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
Expand Down Expand Up @@ -179,6 +184,11 @@ public boolean isReplicated() {
}

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
return setReplicated(replicated, true);
}

public boolean setReplicated(boolean replicated, boolean isPersistent) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand All @@ -188,7 +198,7 @@ public boolean setReplicated(boolean replicated) {
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
}

if (this.cursor != null) {
if (this.cursor != null && isPersistent) {
if (replicated) {
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
Expand Down Expand Up @@ -1230,4 +1240,9 @@ public boolean checkIfPendingAckStoreInit() {
public PendingAckHandle getPendingAckHandle() {
return pendingAckHandle;
}

@VisibleForTesting
public Boolean getReplicatedControlled() {
return replicatedControlled;
}
}
Loading

0 comments on commit c80c0cb

Please sign in to comment.