Skip to content

Commit

Permalink
[feat][broker] PIP-398: Subscription replication on the namespace and…
Browse files Browse the repository at this point in the history
… topic levels

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Dec 25, 2024
1 parent 887d518 commit 09879ff
Show file tree
Hide file tree
Showing 23 changed files with 987 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,15 @@ protected void internalSetNamespaceResourceGroup(String rgName) {
internalSetPolicies("resource_group_name", rgName);
}

protected CompletableFuture<Void> internalSetReplicateSubscriptionStateAsync(Boolean enabled) {
return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.WRITE))
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.replicate_subscription_state = enabled;
return policies;
}));
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5016,6 +5016,29 @@ protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean author
}
}

protected CompletableFuture<Void> internalSetEnableReplicatedSubscription(Boolean enabled,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicateSubscriptionState(enabled);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Boolean> internalGetReplicateSubscriptionState(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicateSubscriptionState)
.orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).replicate_subscription_state;
}
return null;
}));
}

protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName,
boolean authoritative, boolean enabled) {
log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -1946,5 +1947,42 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
internalSetNamespaceResourceGroup(null);
}

@GET
@Path("/{tenant}/{namespace}/replicateSubscriptionState")
@ApiOperation(value = "Get the enabled status of subscriptions replication on a namespace.", response =
Boolean.class)
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public Boolean getReplicateSubscriptionState(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace),
PolicyName.REPLICATED_SUBSCRIPTION, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.replicate_subscription_state;
}

@POST
@Path("/{tenant}/{namespace}/replicateSubscriptionState")
@ApiOperation(value = "Enable or disable subscriptions replication on a namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public void setReplicateSubscriptionState(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Whether to enable subscriptions replication",
required = true)
Boolean enabled) {
validateNamespaceName(tenant, namespace);
internalSetReplicateSubscriptionStateAsync(enabled)
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("set replicate subscriptions state failed", ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,85 @@ public void getReplicatedSubscriptionStatus(
internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative);
}

@POST
@Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionState")
@ApiOperation(value = "Enable or disable subscriptions replication on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or "
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on this topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void setReplicateSubscriptionState(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Whether to enable subscriptions replication", required = true)
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetEnableReplicatedSubscription(enabled, isGlobal))
.thenRun(() -> {
log.info(
"[{}] Successfully set topic replicated subscription enabled: tenant={}, namespace={}, "
+ "topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
handleTopicPolicyException("setReplicateSubscriptionState", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/replicateSubscriptionState")
@ApiOperation(value = "Get the enabled status of subscriptions replication on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error")})
public void getReplicateSubscriptionState(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicateSubscriptionState(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicateSubscriptionState", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/schemaCompatibilityStrategy")
@ApiOperation(value = "Get schema compatibility strategy on a topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters());
topicPolicies.getSchemaCompatibilityStrategy()
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
topicPolicies.getReplicateSubscriptionState().updateTopicValue(data.getReplicateSubscriptionState());
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
Expand Down Expand Up @@ -278,6 +279,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name);
if (!isSystemTopic()) {
topicPolicies.getReplicateSubscriptionState()
.updateNamespaceValue(namespacePolicies.replicate_subscription_state);
}
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {

private final long connectionLivenessCheckTimeoutMillis = 5000;

private final boolean ignoreConsumerReplicateSubscriptionState;

// Number of bytes pending to be published from a single specific IO thread.
private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
@Override
Expand Down Expand Up @@ -281,6 +283,9 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
this.connectionController = new ConnectionController.DefaultConnectionController(conf);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.ignoreConsumerReplicateSubscriptionState =
Boolean.parseBoolean(
conf.getProperties().getProperty("ignoreConsumerReplicateSubscriptionState", "false"));
}

@Override
Expand Down Expand Up @@ -984,8 +989,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final Boolean isReplicated = ignoreConsumerReplicateSubscriptionState ? null :
(subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null);
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public boolean isReplicated() {

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
return setReplicated(replicated, true);
}

public boolean setReplicated(boolean replicated, boolean isPersistent) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand All @@ -193,7 +197,7 @@ public boolean setReplicated(boolean replicated) {
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
}

if (this.cursor != null) {
if (this.cursor != null && isPersistent) {
if (replicated) {
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
cursor.getCursorProperties()));
// subscription-cursor gets activated by default: deactivate as there is no active subscription right
// now
Expand Down Expand Up @@ -2932,8 +2932,14 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
}

public synchronized void checkReplicatedSubscriptionControllerState() {
Boolean replicatedSubscriptionStatus = topicPolicies.getReplicateSubscriptionState().get();
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
// If the subscription does not have a replicated flag configured, please apply the topic policies to the
// subscription.
if (subscription.getReplicatedControlled() == null) {
subscription.setReplicated(replicatedSubscriptionStatus != null && replicatedSubscriptionStatus, false);
}
if (subscription.isReplicated()) {
shouldBeEnabled.set(true);
}
Expand Down
Loading

0 comments on commit 09879ff

Please sign in to comment.