diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java index 414bf4ffcfc35..e59f11e9d968b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java @@ -46,6 +46,10 @@ public boolean resourceGroupExists(String resourceGroupName) throws MetadataStor return exists(joinPath(BASE_PATH, resourceGroupName)); } + public CompletableFuture resourceGroupExistsAsync(String resourceGroupName) { + return existsAsync(joinPath(BASE_PATH, resourceGroupName)); + } + public void createResourceGroup(String resourceGroupName, ResourceGroup rg) throws MetadataStoreException { create(joinPath(BASE_PATH, resourceGroupName), rg); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f8899e4a8a3a6..e9aa9f8ef670d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5267,4 +5267,40 @@ protected CompletableFuture internalSetSchemaCompatibilityStrategy(SchemaC .updateTopicPoliciesAsync(topicName, topicPolicies); })); } + + protected CompletableFuture internalSetResourceGroup(String resourceGroupName, boolean isGlobal) { + boolean isDelete = StringUtils.isEmpty(resourceGroupName); + return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE) + .thenCompose(__ -> { + if (isDelete) { + return CompletableFuture.completedFuture(true); + } + return resourceGroupResources().resourceGroupExistsAsync(resourceGroupName); + }) + .thenCompose(exists -> { + if (!exists) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "ResourceGroup does not exist")); + } + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setResourceGroupName(isDelete ? null : resourceGroupName); + topicPolicies.setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + }); + } + + protected CompletableFuture internalGetResourceGroup(boolean applied, boolean isGlobal) { + return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.READ) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getResourceGroupName) + .orElseGet(() -> { + if (applied) { + return getNamespacePolicies(namespaceName).resource_group_name; + } + return null; + }) + )); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 26889da82d417..c1826266b32c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3928,5 +3928,61 @@ public void removeSchemaCompatibilityStrategy( }); } + @POST + @Path("/{tenant}/{namespace}/{topic}/resourceGroup") + @ApiOperation(value = "Set ResourceGroup for a topic") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic doesn't exist"), + @ApiResponse(code = 405, message = + "Topic level policy is disabled, enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification") + }) + public void setResourceGroup( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "ResourceGroup name", required = true) String resourceGroupName) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetResourceGroup(resourceGroupName, isGlobal)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setResourceGroup", ex, asyncResponse); + return null; + }); + } + + @GET + @Path("/{tenant}/{namespace}/{topic}/resourceGroup") + @ApiOperation(value = "Get ResourceGroup for a topic") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic doesn't exist"), + @ApiResponse(code = 405, message = + "Topic level policy is disabled, enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification") + }) + public void getResourceGroup( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") String encodedTopic, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetResourceGroup(applied, isGlobal)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getResourceGroup", ex, asyncResponse); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 79db7da063034..d927b628e9f69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -69,7 +69,7 @@ public enum ResourceGroupMonitoringClass { public enum ResourceGroupRefTypes { Tenants, Namespaces, - // Topics; // Punt this for when we support direct ref/under from topics. + Topics } // Default ctor: it is not expected that anything outside of this package will need to directly @@ -113,6 +113,7 @@ public ResourceGroup(ResourceGroup other) { this.resourceGroupNamespaceRefs = other.resourceGroupNamespaceRefs; this.resourceGroupTenantRefs = other.resourceGroupTenantRefs; + this.resourceGroupTopicRefs = other.resourceGroupTopicRefs; for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) { PerMonitoringClassFields thisFields = this.monitoringClassFields[idx]; @@ -151,6 +152,10 @@ protected long getResourceGroupNumOfNSRefs() { return this.resourceGroupNamespaceRefs.size(); } + protected long getResourceGroupNumOfTopicRefs() { + return this.resourceGroupTopicRefs.size(); + } + protected long getResourceGroupNumOfTenantRefs() { return this.resourceGroupTenantRefs.size(); } @@ -168,6 +173,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes case Namespaces: set = this.resourceGroupNamespaceRefs; break; + case Topics: + set = this.resourceGroupTopicRefs; } if (ref) { @@ -178,7 +185,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes set.add(name); // If this is the first ref, register with the transport manager. - if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) { + if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() + + this.resourceGroupTopicRefs.size() == 1) { if (log.isDebugEnabled()) { log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName); } @@ -193,7 +201,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes set.remove(name); // If this was the last ref, unregister from the transport manager. - if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) { + if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() + + this.resourceGroupTopicRefs.size() == 0) { if (log.isDebugEnabled()) { log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName); } @@ -606,6 +615,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { // across all of its usage classes (publish/dispatch/...). private Set resourceGroupTenantRefs = ConcurrentHashMap.newKeySet(); private Set resourceGroupNamespaceRefs = ConcurrentHashMap.newKeySet(); + private Set resourceGroupTopicRefs = ConcurrentHashMap.newKeySet(); // Blobs required for transport manager's resource-usage register/unregister ops. ResourceUsageConsumer ruConsumer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index c74681fdb731a..9fcdb6790b30b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -167,9 +167,11 @@ public void resourceGroupDelete(String name) throws PulsarAdminException { long tenantRefCount = rg.getResourceGroupNumOfTenantRefs(); long nsRefCount = rg.getResourceGroupNumOfNSRefs(); - if ((tenantRefCount + nsRefCount) > 0) { + long topicRefCount = rg.getResourceGroupNumOfTopicRefs(); + if ((tenantRefCount + nsRefCount + topicRefCount) > 0) { String errMesg = "Resource group " + name + " still has " + tenantRefCount + " tenant refs"; errMesg += " and " + nsRefCount + " namespace refs on it"; + errMesg += " and " + topicRefCount + " topic refs on it"; throw new PulsarAdminException(errMesg); } @@ -293,6 +295,45 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp rgNamespaceUnRegisters.labels(resourceGroupName).inc(); } + /** + * Registers a topic as a user of a resource group. + * + * @param resourceGroupName + * @param topicName complete topic name + */ + public void registerTopic(String resourceGroupName, TopicName topicName) { + ResourceGroup rg = resourceGroupsMap.get(resourceGroupName); + if (rg == null) { + throw new IllegalStateException("Resource group does not exist: " + resourceGroupName); + } + + ResourceGroupOpStatus status = rg.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics, + true, this.resourceUsageTransportManagerMgr); + if (status == ResourceGroupOpStatus.Exists) { + String msg = String.format("Topic %s already references the target resource group %s", + topicName, resourceGroupName); + throw new IllegalStateException(msg); + } + + // Associate this topic-name with the RG. + this.topicToRGsMap.put(topicName, rg); + rgTopicRegisters.labels(resourceGroupName).inc(); + } + + /** + * UnRegisters a topic from a resource group. + * + * @param topicName complete topic name + */ + public void unRegisterTopic(TopicName topicName) { + ResourceGroup remove = this.topicToRGsMap.remove(topicName); + if (remove != null) { + remove.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics, + false, this.resourceUsageTransportManagerMgr); + rgTopicUnRegisters.labels(remove.resourceGroupName).inc(); + } + } + /** * Return the resource group associated with a namespace. * @@ -303,6 +344,11 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) { return this.namespaceToRGsMap.get(namespaceName); } + @VisibleForTesting + public ResourceGroup getTopicResourceGroup(TopicName topicName) { + return this.topicToRGsMap.get(topicName); + } + @Override public void close() throws Exception { if (aggregateLocalUsagePeriodicTask != null) { @@ -318,8 +364,16 @@ public void close() throws Exception { topicConsumeStats.clear(); } + private void incrementUsage(ResourceGroup resourceGroup, + ResourceGroupMonitoringClass monClass, BytesAndMessagesCount incStats) + throws PulsarAdminException { + resourceGroup.incrementLocalUsageStats(monClass, incStats); + rgLocalUsageBytes.labels(resourceGroup.resourceGroupName, monClass.name()).inc(incStats.bytes); + rgLocalUsageMessages.labels(resourceGroup.resourceGroupName, monClass.name()).inc(incStats.messages); + } + /** - * Increments usage stats for the resource groups associated with the given namespace and tenant. + * Increments usage stats for the resource groups associated with the given namespace, tenant, and topic. * Expected to be called when a message is produced or consumed on a topic, or when we calculate * usage periodically in the background by going through broker-service stats. [Not yet decided * which model we will follow.] Broker-service stats will be cumulative, while calls from the @@ -327,22 +381,25 @@ public void close() throws Exception { * * If the tenant and NS are associated with different RGs, the statistics on both RGs are updated. * If the tenant and NS are associated with the same RG, the stats on the RG are updated only once + * If the tenant, NS and topic are associated with the same RG, the stats on the RG are updated only once * (to avoid a direct double-counting). * ToDo: will this distinction result in "expected semantics", or shock from users? * For now, the only caller is internal to this class. * + * @param topicName Complete topic name * @param tenantName - * @param nsName + * @param nsName Complete namespace name * @param monClass * @param incStats * @returns true if the stats were updated; false if nothing was updated. */ - protected boolean incrementUsage(String tenantName, String nsName, - ResourceGroupMonitoringClass monClass, - BytesAndMessagesCount incStats) throws PulsarAdminException { - final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName)); + protected boolean incrementUsage(String topicName, String tenantName, String nsName, + ResourceGroupMonitoringClass monClass, + BytesAndMessagesCount incStats) throws PulsarAdminException { + final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(nsName)); final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName); - if (tenantRG == null && nsRG == null) { + final ResourceGroup topicRG = this.topicToRGsMap.get(TopicName.get(topicName)); + if (tenantRG == null && nsRG == null && topicRG == null) { return false; } @@ -353,24 +410,40 @@ protected boolean incrementUsage(String tenantName, String nsName, throw new PulsarAdminException(errMesg); } - if (nsRG == tenantRG) { + if (tenantRG == nsRG && nsRG == topicRG) { // Update only once in this case. - // Note that we will update both tenant and namespace RGs in other cases. - nsRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Note that we will update both tenant, namespace and topic RGs in other cases. + incrementUsage(tenantRG, monClass, incStats); + return true; + } + + if (tenantRG != null && tenantRG == nsRG) { + // Tenant and Namespace GRs are same. + incrementUsage(tenantRG, monClass, incStats); + if (topicRG == null) { + return true; + } + } + + if (nsRG != null && nsRG == topicRG) { + // Namespace and Topic GRs are same. + incrementUsage(nsRG, monClass, incStats); return true; } if (tenantRG != null) { - tenantRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Tenant GR is different from other resource GR. + incrementUsage(tenantRG, monClass, incStats); } + if (nsRG != null) { - nsRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Namespace GR is different from other resource GR. + incrementUsage(nsRG, monClass, incStats); + } + + if (topicRG != null) { + // Topic GR is different from other resource GR. + incrementUsage(topicRG, monClass, incStats); } return true; @@ -459,7 +532,7 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n } try { - boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff); + boolean statsUpdated = this.incrementUsage(topicName, tenantString, nsString, monClass, bmDiff); if (log.isDebugEnabled()) { log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " + "by {} bytes, {} mesgs", @@ -550,14 +623,15 @@ protected void aggregateResourceGroupLocalUsages() { final TopicStats topicStats = entry.getValue(); final TopicName topic = TopicName.get(topicName); final String tenantString = topic.getTenant(); - final String nsString = topic.getNamespacePortion(); + final String nsString = topic.getNamespace(); final NamespaceName fqNamespace = topic.getNamespaceObject(); // Can't use containsKey here, as that checks for exact equality // (we need a check for string-comparison). val tenantRG = this.tenantToRGsMap.get(tenantString); val namespaceRG = this.namespaceToRGsMap.get(fqNamespace); - if (tenantRG == null && namespaceRG == null) { + val topicRG = this.topicToRGsMap.get(topic); + if (tenantRG == null && namespaceRG == null && topicRG == null) { // This topic's NS/tenant are not registered to any RG. continue; } @@ -745,6 +819,7 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie // Given a qualified NS-name (i.e., in "tenant/namespace" format), record its associated resource-group private ConcurrentHashMap namespaceToRGsMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap topicToRGsMap = new ConcurrentHashMap<>(); // Maps to maintain the usage per topic, in produce/consume directions. private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap<>(); @@ -832,6 +907,17 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie .labelNames(resourceGroupLabel) .register(); + private static final Counter rgTopicRegisters = Counter.build() + .name("pulsar_resource_group_topic_registers") + .help("Number of registrations of topics") + .labelNames(resourceGroupLabel) + .register(); + private static final Counter rgTopicUnRegisters = Counter.build() + .name("pulsar_resource_group_topic_unregisters") + .help("Number of un-registrations of topics") + .labelNames(resourceGroupLabel) + .register(); + private static final Summary rgUsageAggregationLatency = Summary.build() .quantile(0.5, 0.05) .quantile(0.9, 0.01) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 7f5c30a729fe7..5c6657d4a7a0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resourcegroup.ResourceGroup; import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; @@ -225,6 +226,7 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate())); topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold()); topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate())); + topicPolicies.getResourceGroupName().updateTopicValue(data.getResourceGroupName()); } protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { @@ -271,6 +273,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { brokerService.getPulsar().getConfig().getClusterName()); updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { @@ -1120,31 +1123,54 @@ public void updateResourceGroupLimiter(Optional optPolicies) { updateResourceGroupLimiter(policies); } + /** + * @deprecated Use {@link #updateDispatchRateLimiter()} instead. + */ + @Deprecated public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { requireNonNull(namespacePolicies); - // attach the resource-group level rate limiters, if set - String rgName = namespacePolicies.resource_group_name; + topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); + updateResourceGroupLimiter(); + } + + public void updateResourceGroupLimiter() { + String rgName = topicPolicies.getResourceGroupName().get(); if (rgName != null) { - final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + ResourceGroupService resourceGroupService = brokerService.getPulsar().getResourceGroupServiceManager(); + final ResourceGroup resourceGroup = resourceGroupService.resourceGroupGet(rgName); if (resourceGroup != null) { + TopicName topicName = TopicName.get(topic); + resourceGroupService.unRegisterTopic(topicName); + String topicRg = topicPolicies.getResourceGroupName().getTopicValue(); + if (topicRg != null) { + try { + resourceGroupService.registerTopic(topicRg, topicName); + } catch (Exception e) { + log.error("Failed to register resource group {} for topic {}", rgName, topic); + return; + } + } this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); - return; } } else { - if (this.resourceGroupRateLimitingEnabled) { - this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); - this.resourceGroupPublishLimiter = null; - this.resourceGroupRateLimitingEnabled = false; - } + closeResourceGroupLimiter(); + /* Namespace detached from resource group. Enable the producer read */ enableProducerReadForPublishRateLimiting(); } } + protected void closeResourceGroupLimiter() { + if (resourceGroupRateLimitingEnabled) { + this.resourceGroupPublishLimiter = null; + this.resourceGroupRateLimitingEnabled = false; + brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic)); + } + } + public long getMsgInCounter() { return this.msgInCounter.longValue(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index e3ecd39b71898..3f9f088baa583 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -170,7 +170,7 @@ public CompletableFuture initialize() { schemaValidationEnforced = policies.schema_validation_enforced; } updatePublishRateLimiter(); - updateResourceGroupLimiter(policies); + updateResourceGroupLimiter(); }); } @@ -444,6 +444,9 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c brokerService.executor().execute(() -> { brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); + + closeResourceGroupLimiter(); + log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); }); @@ -511,6 +514,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect brokerService.executor().execute(() -> { brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); + closeResourceGroupLimiter(); closeFuture.complete(null); }); }).exceptionally(exception -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cf6b82294dc02..148d5862e8c37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -319,7 +319,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishRateLimiter(); - updateResourceGroupLimiter(new Policies()); + updateResourceGroupLimiter(); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -335,7 +335,7 @@ public CompletableFuture initialize() { updatePublishRateLimiter(); - updateResourceGroupLimiter(policies); + updateResourceGroupLimiter(); this.isEncryptionRequired = policies.encryption_required; @@ -1224,6 +1224,8 @@ public void deleteLedgerComplete(Object ctx) { unregisterTopicPolicyListener(); + closeResourceGroupLimiter(); + log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); } @@ -1342,6 +1344,9 @@ private void disposeTopic(CompletableFuture closeFuture) { subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); unregisterTopicPolicyListener(); + + closeResourceGroupLimiter(); + log.info("[{}] Topic closed", topic); cancelFencedTopicMonitoringTask(); closeFuture.complete(null); @@ -2457,7 +2462,7 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { // Apply policies for components. List> applyPolicyTasks = applyUpdatedTopicPolicies(); - applyPolicyTasks.add(applyUpdatedNamespacePolicies(data)); + applyPolicyTasks.add(applyUpdatedNamespacePolicies()); return FutureUtil.waitForAll(applyPolicyTasks) .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) .exceptionally(ex -> { @@ -2466,8 +2471,8 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { }); } - private CompletableFuture applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) { - return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies)); + private CompletableFuture applyUpdatedNamespacePolicies() { + return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter()); } private List> applyUpdatedTopicPolicies() { @@ -2486,6 +2491,7 @@ private List> applyUpdatedTopicPolicies() { applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java new file mode 100644 index 0000000000000..7c539e13044f8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Unit test {@link AdminResource}. + */ +@Test(groups = "broker-admin") +public class AdminResourceGroupTest extends BrokerTestBase { + + @BeforeClass + @Override + public void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + } + + @Test + public void testTopicResourceGroup() throws PulsarAdminException { + String topic = newTopicName(); + TopicName topicName = TopicName.get(topic); + + String resourceGroupName = "rg-topic-" + UUID.randomUUID(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setPublishRateInMsgs(1000); + resourceGroup.setPublishRateInBytes(100000L); + resourceGroup.setDispatchRateInMsgs(2000); + resourceGroup.setDispatchRateInBytes(200000L); + admin.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + + admin.topics().createNonPartitionedTopic(topic); + + admin.topicPolicies().setResourceGroup(topic, resourceGroupName); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getResourceGroup(topic, true), resourceGroupName); + }); + + Awaitility.await().untilAsserted(() -> { + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNotNull(rg); + assertEquals(rg.resourceGroupName, resourceGroupName); + }); + + admin.topicPolicies().removeResourceGroup(topic); + Awaitility.await().untilAsserted(() -> { + assertTrue(StringUtils.isEmpty(admin.topicPolicies().getResourceGroup(topic, true))); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNull(rg); + }); + } + + @Test + public void testTopicResourceGroupOverriderNamespaceResourceGroup() throws PulsarAdminException { + String namespaceResourceGroupName = "rg-ns-" + UUID.randomUUID(); + ResourceGroup namespaceResourceGroup = new ResourceGroup(); + namespaceResourceGroup.setPublishRateInMsgs(1001); + namespaceResourceGroup.setPublishRateInBytes(100001L); + namespaceResourceGroup.setDispatchRateInMsgs(2001); + namespaceResourceGroup.setDispatchRateInBytes(200001L); + admin.resourcegroups().createResourceGroup(namespaceResourceGroupName, namespaceResourceGroup); + + String topicResourceGroupName = "rg-topic-" + UUID.randomUUID(); + ResourceGroup topicResourceGroup = new ResourceGroup(); + topicResourceGroup.setPublishRateInMsgs(1000); + topicResourceGroup.setPublishRateInBytes(100000L); + topicResourceGroup.setDispatchRateInMsgs(2000); + topicResourceGroup.setDispatchRateInBytes(200000L); + admin.resourcegroups().createResourceGroup(topicResourceGroupName, topicResourceGroup); + + String topic = newTopicName(); + TopicName topicName = TopicName.get(topic); + String namespace = topicName.getNamespace(); + admin.namespaces().setNamespaceResourceGroup(namespace, namespaceResourceGroupName); + assertEquals(admin.namespaces().getNamespaceResourceGroup(namespace), namespaceResourceGroupName); + + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setResourceGroup(topic, topicResourceGroupName); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies() + .getResourceGroup(topic, true), topicResourceGroupName); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNotNull(rg); + assertEquals(rg.resourceGroupName, topicResourceGroupName); + }); + + admin.topicPolicies().removeResourceGroup(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies() + .getResourceGroup(topic, true), namespaceResourceGroupName); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNull(rg); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 86dff398f9774..cb408bcbc0cf1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.broker.resourcegroup; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; -import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -30,13 +33,10 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.Assert; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; public class ResourceGroupServiceTest extends MockedPulsarServiceBaseTest { @BeforeClass @@ -111,16 +111,18 @@ public void measureOpsTime() throws PulsarAdminException { numPerfTestIterations, diffMsecs, (1000 * (float) diffMsecs)/numPerfTestIterations); // Going through the resource-group service - final String tenantName = "SomeTenant"; - final String namespaceName = "SomeNameSpace"; + final TopicName topicName = TopicName.get("SomeTenant/SomeNameSpace/my-topic"); + rgs.registerTopic(rgName, topicName); + final String tenantName = topicName.getTenant(); + final String namespaceName = topicName.getNamespace(); rgs.registerTenant(rgName, tenantName); - final NamespaceName tenantAndNamespaceName = NamespaceName.get(tenantName, namespaceName); + final NamespaceName tenantAndNamespaceName = topicName.getNamespaceObject(); rgs.registerNameSpace(rgName, tenantAndNamespaceName); mSecsStart = System.currentTimeMillis(); for (int ix = 0; ix < numPerfTestIterations; ix++) { for (int monClassIdx = 0; monClassIdx < ResourceGroupMonitoringClass.values().length; monClassIdx++) { monClass = ResourceGroupMonitoringClass.values()[monClassIdx]; - rgs.incrementUsage(tenantName, namespaceName, monClass, stats); + rgs.incrementUsage(topicName.toString(), tenantName, namespaceName, monClass, stats); } } mSecsEnd = System.currentTimeMillis(); @@ -129,6 +131,7 @@ public void measureOpsTime() throws PulsarAdminException { numPerfTestIterations, diffMsecs, (1000 * (float) diffMsecs)/numPerfTestIterations); rgs.unRegisterTenant(rgName, tenantName); rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName); + rgs.unRegisterTopic(topicName); // The overhead of a RG lookup mSecsStart = System.currentTimeMillis(); @@ -197,6 +200,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep final NamespaceName tenantAndNamespace = NamespaceName.get(tenantName, namespaceName); rgs.registerNameSpace(rgName, tenantAndNamespace); + rgs.registerTopic(rgName, topic); // Delete of our valid config should throw until we unref correspondingly. Assert.assertThrows(PulsarAdminException.class, () -> rgs.resourceGroupDelete(rgName)); @@ -231,6 +235,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgs.unRegisterTenant(rgName, tenantName); rgs.unRegisterNameSpace(rgName, tenantAndNamespace); + rgs.unRegisterTopic(topic); BytesAndMessagesCount publishQuota = rgs.getPublishRateLimiters(rgName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java new file mode 100644 index 0000000000000..f5f615709ca11 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertNotNull; +import com.google.common.collect.Sets; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class ResourceGroupUsageAggregationOnTopicLevelTest extends ProducerConsumerBase { + + private final String TenantName = "pulsar-test"; + private final String NsName = "test"; + private final String TenantAndNsName = TenantName + "/" + NsName; + private final String TestProduceConsumeTopicName = "/test/prod-cons-topic"; + private final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://" + TenantAndNsName + TestProduceConsumeTopicName; + private final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC = + "non-persistent://" + TenantAndNsName + TestProduceConsumeTopicName; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + this.conf.setAllowAutoTopicCreation(true); + + final String clusterName = "test"; + admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + admin.tenants().createTenant(TenantName, + new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName))); + admin.namespaces().createNamespace(TenantAndNsName); + admin.namespaces().setNamespaceReplicationClusters(TenantAndNsName, Sets.newHashSet(clusterName)); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testPersistentTopicProduceConsumeUsageOnRG() throws Exception { + testProduceConsumeUsageOnRG(PRODUCE_CONSUME_PERSISTENT_TOPIC); + } + + @Test + public void testNonPersistentTopicProduceConsumeUsageOnRG() throws Exception { + testProduceConsumeUsageOnRG(PRODUCE_CONSUME_NON_PERSISTENT_TOPIC); + } + + private void testProduceConsumeUsageOnRG(String topicString) throws Exception { + ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() { + @Override + public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, + long currentMessagesUsed, long lastReportedMessages, + long lastReportTimeMSecsSinceEpoch) { + return false; + } + + @Override + public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { + return 0; + } + }; + + @Cleanup + ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(pulsar); + @Cleanup + ResourceGroupService rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, transportMgr, + dummyQuotaCalc); + + String activeRgName = "runProduceConsume"; + ResourceGroup activeRG; + + ResourceUsagePublisher ruP = new ResourceUsagePublisher() { + @Override + public String getID() { + return rgs.resourceGroupGet(activeRgName).resourceGroupName; + } + + @Override + public void fillResourceUsage(ResourceUsage resourceUsage) { + rgs.resourceGroupGet(activeRgName).rgFillResourceUsage(resourceUsage); + } + }; + + ResourceUsageConsumer ruC = new ResourceUsageConsumer() { + @Override + public String getID() { + return rgs.resourceGroupGet(activeRgName).resourceGroupName; + } + + @Override + public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { + rgs.resourceGroupGet(activeRgName).rgResourceUsageListener(broker, resourceUsage); + } + }; + + org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + rgConfig.setPublishRateInBytes(1500L); + rgConfig.setPublishRateInMsgs(100); + rgConfig.setDispatchRateInBytes(4000L); + rgConfig.setPublishRateInMsgs(500); + + rgs.resourceGroupCreate(activeRgName, rgConfig, ruP, ruC); + + activeRG = rgs.resourceGroupGet(activeRgName); + assertNotNull(activeRG); + + String subscriptionName = "my-subscription"; + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicString) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicString) + .create(); + + TopicName myTopic = TopicName.get(topicString); + rgs.unRegisterTopic(myTopic); + rgs.registerTopic(activeRgName,myTopic); + + final int NumMessagesToSend = 10; + int sentNumBytes = 0; + int sentNumMsgs = 0; + for (int ix = 0; ix < NumMessagesToSend; ix++) { + byte[] mesg = String.format("Hi, ix=%s", ix).getBytes(); + producer.send(mesg); + sentNumBytes += mesg.length; + sentNumMsgs++; + } + + this.verifyStats(rgs, topicString, activeRgName, sentNumBytes, sentNumMsgs, 0, 0, + true, false); + + int recvdNumBytes = 0; + int recvdNumMsgs = 0; + + Message message; + while (recvdNumMsgs < sentNumMsgs) { + message = consumer.receive(); + recvdNumBytes += message.getValue().length; + recvdNumMsgs++; + } + + this.verifyStats(rgs,topicString, activeRgName, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, + true, true); + } + + private void verifyStats(ResourceGroupService rgs, String topicString, String rgName, + int sentNumBytes, int sentNumMsgs, + int recvdNumBytes, int recvdNumMsgs, + boolean checkProduce, boolean checkConsume) throws PulsarAdminException { + BrokerService bs = pulsar.getBrokerService(); + Awaitility.await().untilAsserted(() -> { + TopicStatsImpl topicStats = bs.getTopicStats().get(topicString); + assertNotNull(topicStats); + if (checkProduce) { + Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter); + } + if (checkConsume) { + Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter); + } + }); + if (sentNumMsgs > 0 || recvdNumMsgs > 0) { + rgs.aggregateResourceGroupLocalUsages(); + BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + ResourceGroupUsageStatsType.Cumulative); + BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + ResourceGroupUsageStatsType.Cumulative); + + if (checkProduce) { + Assert.assertTrue(prodCounts.bytes >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, prodCounts.messages); + } + if (checkConsume) { + Assert.assertTrue(consCounts.bytes >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, consCounts.messages); + } + } + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 1dfb79d7ba0b7..644406fb25def 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; @@ -1718,4 +1720,60 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean * @param topic The topic in whose policy should be removed */ CompletableFuture removeSchemaCompatibilityStrategyAsync(String topic); + + /** + * Get the ResourceGroup for a topic. + * + * @param topic Topic name + * @param applied True gets namespace level configuration if ResourceGroup does not exist on the topic. + * False gets topic level configuration. + * @return ResourceGroup + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + String getResourceGroup(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get the ResourceGroup for a topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture getResourceGroupAsync(String topic, boolean applied); + + /** + * Set the ResourceGroup for a topic. + * + * @param topic Topic name + * @param resourceGroupName ResourceGroup name + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + void setResourceGroup(String topic, String resourceGroupName) throws PulsarAdminException; + + /** + * Set the ResourceGroup for a topic. + * + * @param topic Topic name + * @param resourceGroupName ResourceGroup name + */ + CompletableFuture setResourceGroupAsync(String topic, String resourceGroupName); + + /** + * Remove the ResourceGroup on a topic. + * + * @param topic Topic name + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + void removeResourceGroup(String topic) throws PulsarAdminException; + + /** + * Remove the ResourceGroup on a topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture removeResourceGroupAsync(String topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index bd3003778076d..60b49507e33fb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -1420,6 +1420,54 @@ public CompletableFuture removeSchemaCompatibilityStrategyAsync(String top return asyncDeleteRequest(path); } + @Override + public String getResourceGroup(String topic, boolean applied) throws PulsarAdminException { + return sync(() -> getResourceGroupAsync(topic, applied)); + } + + @Override + public CompletableFuture getResourceGroupAsync(String topic, boolean applied) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "resourceGroup"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(String rgName) { + future.complete(rgName); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setResourceGroup(String topic, String resourceGroupName) throws PulsarAdminException { + sync(() -> setResourceGroupAsync(topic, resourceGroupName)); + } + + @Override + public CompletableFuture setResourceGroupAsync(String topic, String resourceGroupName) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "resourceGroup"); + return asyncPostRequest(path, Entity.entity(resourceGroupName, MediaType.APPLICATION_JSON_TYPE)); + } + + @Override + public void removeResourceGroup(String topic) throws PulsarAdminException { + sync(() -> removeResourceGroupAsync(topic)); + } + + @Override + public CompletableFuture removeResourceGroupAsync(String topic) { + return setResourceGroupAsync(topic, null); + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 9bd6c18b4aec6..f718ba7877138 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -145,6 +145,10 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + + jcommander.addCommand("set-resource-group", new SetResourceGroup()); + jcommander.addCommand("get-resource-group", new GetResourceGroup()); + jcommander.addCommand("remove-resource-group", new RemoveResourceGroup()); } @Parameters(commandDescription = "Get max consumers per subscription for a topic") @@ -1742,6 +1746,61 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get ResourceGroup for a topic") + private class GetResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--applied", "-a"}, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + print(getTopicPolicies(isGlobal).getResourceGroup(topic, applied)); + } + } + + @Parameters(commandDescription = "Set ResourceGroup for a topic") + private class SetResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = {"--resource-group-name", "-rgn"}, description = "ResourceGroup name", required = true) + private String rgName; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).setResourceGroup(topic, rgName); + } + } + + @Parameters(commandDescription = "Remove ResourceGroup from a topic") + private class RemoveResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).removeResourceGroup(topic); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 0532744bec3e6..53b119c039c99 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -58,6 +58,8 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue schemaCompatibilityStrategy; final PolicyHierarchyValue dispatchRate; + final PolicyHierarchyValue resourceGroupName; + public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); retentionPolicies = new PolicyHierarchyValue<>(); @@ -86,5 +88,6 @@ public HierarchyTopicPolicies() { subscriptionDispatchRate = new PolicyHierarchyValue<>(); schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); + resourceGroupName = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 6e81509c83094..2ec4b18a1dbe1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -71,6 +71,7 @@ public class TopicPolicies { private Integer maxSubscriptionsPerTopic; private DispatchRateImpl replicatorDispatchRate; private SchemaCompatibilityStrategy schemaCompatibilityStrategy; + private String resourceGroupName; public boolean isGlobalPolicies() { return isGlobal != null && isGlobal;