Skip to content

Commit

Permalink
[feat][broker] Add ResourceGroup management on the topic level
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 6, 2024
1 parent c43645f commit e908106
Show file tree
Hide file tree
Showing 16 changed files with 822 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public boolean resourceGroupExists(String resourceGroupName) throws MetadataStor
return exists(joinPath(BASE_PATH, resourceGroupName));
}

public CompletableFuture<Boolean> resourceGroupExistsAsync(String resourceGroupName) {
return existsAsync(joinPath(BASE_PATH, resourceGroupName));
}

public void createResourceGroup(String resourceGroupName, ResourceGroup rg) throws MetadataStoreException {
create(joinPath(BASE_PATH, resourceGroupName), rg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5267,4 +5267,40 @@ protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaC
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalSetResourceGroup(String resourceGroupName, boolean isGlobal) {
boolean isDelete = StringUtils.isEmpty(resourceGroupName);
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE)
.thenCompose(__ -> {
if (isDelete) {
return CompletableFuture.completedFuture(true);
}
return resourceGroupResources().resourceGroupExistsAsync(resourceGroupName);
})
.thenCompose(exists -> {
if (!exists) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"ResourceGroup does not exist"));
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setResourceGroupName(isDelete ? null : resourceGroupName);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
});
}

protected CompletableFuture<String> internalGetResourceGroup(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getResourceGroupName)
.orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).resource_group_name;
}
return null;
})
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3928,5 +3928,61 @@ public void removeSchemaCompatibilityStrategy(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Set ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void setResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "ResourceGroup name", required = true) String resourceGroupName) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetResourceGroup(resourceGroupName, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setResourceGroup", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Get ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void getResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetResourceGroup(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getResourceGroup", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public enum ResourceGroupMonitoringClass {
public enum ResourceGroupRefTypes {
Tenants,
Namespaces,
// Topics; // Punt this for when we support direct ref/under from topics.
Topics
}

// Default ctor: it is not expected that anything outside of this package will need to directly
Expand Down Expand Up @@ -113,6 +113,7 @@ public ResourceGroup(ResourceGroup other) {

this.resourceGroupNamespaceRefs = other.resourceGroupNamespaceRefs;
this.resourceGroupTenantRefs = other.resourceGroupTenantRefs;
this.resourceGroupTopicRefs = other.resourceGroupTopicRefs;

for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) {
PerMonitoringClassFields thisFields = this.monitoringClassFields[idx];
Expand Down Expand Up @@ -151,6 +152,10 @@ protected long getResourceGroupNumOfNSRefs() {
return this.resourceGroupNamespaceRefs.size();
}

protected long getResourceGroupNumOfTopicRefs() {
return this.resourceGroupTopicRefs.size();
}

protected long getResourceGroupNumOfTenantRefs() {
return this.resourceGroupTenantRefs.size();
}
Expand All @@ -168,6 +173,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
case Namespaces:
set = this.resourceGroupNamespaceRefs;
break;
case Topics:
set = this.resourceGroupTopicRefs;
}

if (ref) {
Expand All @@ -178,7 +185,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.add(name);

// If this is the first ref, register with the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 1) {
if (log.isDebugEnabled()) {
log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
}
Expand All @@ -193,7 +201,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.remove(name);

// If this was the last ref, unregister from the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 0) {
if (log.isDebugEnabled()) {
log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
}
Expand Down Expand Up @@ -606,6 +615,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
// across all of its usage classes (publish/dispatch/...).
private Set<String> resourceGroupTenantRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupNamespaceRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupTopicRefs = ConcurrentHashMap.newKeySet();

// Blobs required for transport manager's resource-usage register/unregister ops.
ResourceUsageConsumer ruConsumer;
Expand Down
Loading

0 comments on commit e908106

Please sign in to comment.