Skip to content

Commit

Permalink
[improve][broker] Introduce resourcegroup rate limit on topic and geo (
Browse files Browse the repository at this point in the history
…#9)

* [feat][broker] Add ResourceGroup management on the topic level

Signed-off-by: Zixuan Liu <[email protected]>

* [feat][broker] Add ResourceGroup-based dispatch rate limits to the Replicator

Signed-off-by: Zixuan Liu <[email protected]>

* [fix][broker] Consume the ResourceGroup dispatch quota

Signed-off-by: Zixuan Liu <[email protected]>

* [feat][broker] Add ResourceGroup-based dispatch rate limits to the Topic

Signed-off-by: Zixuan Liu <[email protected]>

* Address comments

Signed-off-by: Zixuan Liu <[email protected]>

---------

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 15, 2024
1 parent f7f7c96 commit 10f039d
Show file tree
Hide file tree
Showing 32 changed files with 1,521 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public boolean resourceGroupExists(String resourceGroupName) throws MetadataStor
return exists(joinPath(BASE_PATH, resourceGroupName));
}

public CompletableFuture<Boolean> resourceGroupExistsAsync(String resourceGroupName) {
return existsAsync(joinPath(BASE_PATH, resourceGroupName));
}

public void createResourceGroup(String resourceGroupName, ResourceGroup rg) throws MetadataStoreException {
create(joinPath(BASE_PATH, resourceGroupName), rg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5267,4 +5267,40 @@ protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaC
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalSetResourceGroup(String resourceGroupName, boolean isGlobal) {
boolean isDelete = StringUtils.isEmpty(resourceGroupName);
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE)
.thenCompose(__ -> {
if (isDelete) {
return CompletableFuture.completedFuture(true);
}
return resourceGroupResources().resourceGroupExistsAsync(resourceGroupName);
})
.thenCompose(exists -> {
if (!exists) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"ResourceGroup does not exist"));
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setResourceGroupName(isDelete ? null : resourceGroupName);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
});
}

protected CompletableFuture<String> internalGetResourceGroup(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getResourceGroupName)
.orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).resource_group_name;
}
return null;
})
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3928,5 +3928,61 @@ public void removeSchemaCompatibilityStrategy(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Set ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void setResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "ResourceGroup name", required = true) String resourceGroupName) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetResourceGroup(resourceGroupName, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setResourceGroup", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Get ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void getResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetResourceGroup(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getResourceGroup", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static class BytesAndMessagesCount {
public enum ResourceGroupMonitoringClass {
Publish,
Dispatch,
ReplicationDispatch,
// Storage; // Punt this for now, until we have a clearer idea of the usage, statistics, etc.
}

Expand All @@ -69,7 +70,7 @@ public enum ResourceGroupMonitoringClass {
public enum ResourceGroupRefTypes {
Tenants,
Namespaces,
// Topics; // Punt this for when we support direct ref/under from topics.
Topics
}

// Default ctor: it is not expected that anything outside of this package will need to directly
Expand All @@ -84,6 +85,13 @@ protected ResourceGroup(ResourceGroupService rgs, String name,
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name,
this.getResourceGroupPublishLimiter());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching replication dispatch rate limiter {} to {}", this.resourceGroupReplicationDispatchLimiter,
name);
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching topic dispatch rate limiter {} to {}", this.resourceGroupDispatchLimiter, name);
}

// ctor for overriding the transport-manager fill/set buffer.
Expand All @@ -97,6 +105,10 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName,
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.ruPublisher = rgPublisher;
this.ruConsumer = rgConsumer;
}
Expand All @@ -107,12 +119,15 @@ public ResourceGroup(ResourceGroup other) {
this.resourceGroupName = other.resourceGroupName;
this.rgs = other.rgs;
this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter;
this.resourceGroupDispatchLimiter = other.resourceGroupDispatchLimiter;
this.setResourceGroupMonitoringClassFields();

// ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required.

this.resourceGroupNamespaceRefs = other.resourceGroupNamespaceRefs;
this.resourceGroupTenantRefs = other.resourceGroupTenantRefs;
this.resourceGroupTopicRefs = other.resourceGroupTopicRefs;

for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) {
PerMonitoringClassFields thisFields = this.monitoringClassFields[idx];
Expand Down Expand Up @@ -145,12 +160,19 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour
pubBmc.messages = rgConfig.getPublishRateInMsgs();
pubBmc.bytes = rgConfig.getPublishRateInBytes();
this.resourceGroupPublishLimiter.update(pubBmc);
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig);
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, rgConfig);
}

protected long getResourceGroupNumOfNSRefs() {
return this.resourceGroupNamespaceRefs.size();
}

protected long getResourceGroupNumOfTopicRefs() {
return this.resourceGroupTopicRefs.size();
}

protected long getResourceGroupNumOfTenantRefs() {
return this.resourceGroupTenantRefs.size();
}
Expand All @@ -168,6 +190,9 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
case Namespaces:
set = this.resourceGroupNamespaceRefs;
break;
case Topics:
set = this.resourceGroupTopicRefs;
break;
}

if (ref) {
Expand All @@ -178,7 +203,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.add(name);

// If this is the first ref, register with the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 1) {
if (log.isDebugEnabled()) {
log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
}
Expand All @@ -193,7 +219,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.remove(name);

// If this was the last ref, unregister from the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 0) {
if (log.isDebugEnabled()) {
log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
}
Expand Down Expand Up @@ -221,6 +248,9 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
p = resourceUsage.setDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);

p = resourceUsage.setReplicationDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p);

// Punt storage for now.
}

Expand All @@ -234,6 +264,9 @@ public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage)
p = resourceUsage.getDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);

p = resourceUsage.getReplicationDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker);

// Punt storage for now.
}

Expand Down Expand Up @@ -351,14 +384,6 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass

protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
// Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
}
return null;
}

this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount;

Expand All @@ -367,7 +392,22 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
oldBMCount = monEntity.quotaForNextPeriod;
try {
monEntity.quotaForNextPeriod = newQuota;
this.resourceGroupPublishLimiter.update(newQuota);
switch (monClass) {
case ReplicationDispatch:
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota);
break;
case Publish:
this.resourceGroupPublishLimiter.update(newQuota);
break;
case Dispatch:
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, newQuota);
break;
default:
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={};", monClass);
}
}
} finally {
monEntity.localUsageStatsLock.unlock();
}
Expand Down Expand Up @@ -419,9 +459,16 @@ protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount .
}

private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
if (monClass != ResourceGroupMonitoringClass.Publish && monClass != ResourceGroupMonitoringClass.Dispatch) {
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
switch (monClass) {
case Publish:
break;
case Dispatch:
break;
case ReplicationDispatch:
break;
default:
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
}
}

Expand Down Expand Up @@ -566,6 +613,14 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies.
? -1 : rgConfig.getDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs() == null
? -1 : rgConfig.getDispatchRateInMsgs();

idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal();
this.monitoringClassFields[idx]
.configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null
? -1 : rgConfig.getReplicationDispatchRateInBytes();
this.monitoringClassFields[idx]
.configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null
? -1 : rgConfig.getReplicationDispatchRateInMsgs();
}

private void setDefaultResourceUsageTransportHandlers() {
Expand Down Expand Up @@ -606,6 +661,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
// across all of its usage classes (publish/dispatch/...).
private Set<String> resourceGroupTenantRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupNamespaceRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupTopicRefs = ConcurrentHashMap.newKeySet();

// Blobs required for transport manager's resource-usage register/unregister ops.
ResourceUsageConsumer ruConsumer;
Expand Down Expand Up @@ -640,6 +696,12 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
@Getter
protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupDispatchLimiter;

protected static class PerMonitoringClassFields {
// This lock covers all the "local" counts (i.e., except for the per-broker usage stats).
Lock localUsageStatsLock;
Expand Down
Loading

0 comments on commit 10f039d

Please sign in to comment.