diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index d927b628e9f69..20e7b13590afb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -60,6 +60,7 @@ public static class BytesAndMessagesCount { public enum ResourceGroupMonitoringClass { Publish, Dispatch, + ReplicationDispatch, // Storage; // Punt this for now, until we have a clearer idea of the usage, statistics, etc. } @@ -69,7 +70,8 @@ public enum ResourceGroupMonitoringClass { public enum ResourceGroupRefTypes { Tenants, Namespaces, - Topics + Topics, + Replicators, } // Default ctor: it is not expected that anything outside of this package will need to directly @@ -84,6 +86,8 @@ protected ResourceGroup(ResourceGroupService rgs, String name, this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name, this.getResourceGroupPublishLimiter()); + this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager + .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); } // ctor for overriding the transport-manager fill/set buffer. @@ -97,6 +101,8 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName, this.setResourceGroupMonitoringClassFields(); this.setResourceGroupConfigParameters(rgConfig); this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); + this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager + .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); this.ruPublisher = rgPublisher; this.ruConsumer = rgConsumer; } @@ -107,6 +113,7 @@ public ResourceGroup(ResourceGroup other) { this.resourceGroupName = other.resourceGroupName; this.rgs = other.rgs; this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter; + this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter; this.setResourceGroupMonitoringClassFields(); // ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required. @@ -146,6 +153,7 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour pubBmc.messages = rgConfig.getPublishRateInMsgs(); pubBmc.bytes = rgConfig.getPublishRateInBytes(); this.resourceGroupPublishLimiter.update(pubBmc); + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig); } protected long getResourceGroupNumOfNSRefs() { @@ -230,6 +238,9 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) { p = resourceUsage.setDispatch(); this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p); + p = resourceUsage.setReplicationDispatch(); + this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p); + // Punt storage for now. } @@ -243,6 +254,9 @@ public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) p = resourceUsage.getDispatch(); this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker); + p = resourceUsage.getReplicationDispatch(); + this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker); + // Punt storage for now. } @@ -360,14 +374,6 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndMessagesCount newQuota) throws PulsarAdminException { - // Only the Publish side is functional now; add the Dispatch side code when the consume side is ready. - if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) { - if (log.isDebugEnabled()) { - log.debug("Doing nothing for monClass={}; only Publish is functional", monClass); - } - return null; - } - this.checkMonitoringClass(monClass); BytesAndMessagesCount oldBMCount; @@ -376,7 +382,18 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo oldBMCount = monEntity.quotaForNextPeriod; try { monEntity.quotaForNextPeriod = newQuota; - this.resourceGroupPublishLimiter.update(newQuota); + switch (monClass) { + case ReplicationDispatch: + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota); + break; + case Publish: + this.resourceGroupPublishLimiter.update(newQuota); + break; + default: + if (log.isDebugEnabled()) { + log.debug("Doing nothing for monClass={};", monClass); + } + } } finally { monEntity.localUsageStatsLock.unlock(); } @@ -428,9 +445,16 @@ protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount . } private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException { - if (monClass != ResourceGroupMonitoringClass.Publish && monClass != ResourceGroupMonitoringClass.Dispatch) { - String errMesg = "Unexpected monitoring class: " + monClass; - throw new PulsarAdminException(errMesg); + switch (monClass) { + case Publish: + break; + case Dispatch: + break; + case ReplicationDispatch: + break; + default: + String errMesg = "Unexpected monitoring class: " + monClass; + throw new PulsarAdminException(errMesg); } } @@ -575,6 +599,12 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies. ? -1 : rgConfig.getDispatchRateInBytes(); this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs() == null ? -1 : rgConfig.getDispatchRateInMsgs(); + + idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal(); + this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null + ? -1 : rgConfig.getReplicationDispatchRateInBytes(); + this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null + ? -1 : rgConfig.getReplicationDispatchRateInMsgs(); } private void setDefaultResourceUsageTransportHandlers() { @@ -650,6 +680,12 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { @Getter protected ResourceGroupPublishLimiter resourceGroupPublishLimiter; + @Getter + protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter; + + @Getter + protected ResourceGroupDispatchLimiter resourceGroupTopicDispatchLimiter; + protected static class PerMonitoringClassFields { // This lock covers all the "local" counts (i.e., except for the per-broker usage stats). Lock localUsageStatsLock; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java new file mode 100644 index 0000000000000..104890fd8cd46 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.RateLimiter; + +public class ResourceGroupDispatchLimiter implements AutoCloseable { + + private final ScheduledExecutorService executorService; + private volatile RateLimiter dispatchRateLimiterOnMessage; + private volatile RateLimiter dispatchRateLimiterOnByte; + + public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, long dispatchRateInMsgs, long dispatchRateInBytes) { + this.executorService = executorService; + update(dispatchRateInMsgs, dispatchRateInBytes); + } + + public void update(long dispatchRateInMsgs, long dispatchRateInBytes) { + if (dispatchRateInMsgs > 0) { + if (dispatchRateLimiterOnMessage != null) { + this.dispatchRateLimiterOnMessage.setRate(dispatchRateInMsgs); + } else { + this.dispatchRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(executorService) + .permits(dispatchRateInMsgs) + .rateTime(1) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(null) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + if (this.dispatchRateLimiterOnMessage != null) { + this.dispatchRateLimiterOnMessage.close(); + this.dispatchRateLimiterOnMessage = null; + } + } + + if (dispatchRateInBytes > 0) { + if (dispatchRateLimiterOnByte != null) { + this.dispatchRateLimiterOnByte.setRate(dispatchRateInBytes); + } else { + this.dispatchRateLimiterOnByte = + RateLimiter.builder() + .scheduledExecutorService(executorService) + .permits(dispatchRateInBytes) + .rateTime(1) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(null) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + if (this.dispatchRateLimiterOnByte != null) { + this.dispatchRateLimiterOnByte.close(); + this.dispatchRateLimiterOnByte = null; + } + } + } + + /** + * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1. + * + * @return + */ + public long getAvailableDispatchRateLimitOnMsg() { + return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits(); + } + + /** + * returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1. + * + * @return + */ + public long getAvailableDispatchRateLimitOnByte() { + return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits(); + } + + /** + * It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed. + * + * @param numberOfMessages + * @param byteSize + */ + public void consumeDispatchQuota(long numberOfMessages, long byteSize) { + if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) { + dispatchRateLimiterOnMessage.tryAcquire(numberOfMessages); + } + if (byteSize > 0 && dispatchRateLimiterOnByte != null) { + dispatchRateLimiterOnByte.tryAcquire(byteSize); + } + } + + /** + * Checks if dispatch-rate limiting is enabled. + * + * @return + */ + public boolean isDispatchRateLimitingEnabled() { + return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null; + } + + public void close() { + if (dispatchRateLimiterOnMessage != null) { + dispatchRateLimiterOnMessage = null; + } + if (dispatchRateLimiterOnByte != null) { + dispatchRateLimiterOnByte = null; + } + } + + /** + * Get configured msg dispatch-throttling rate. Returns -1 if not configured + * + * @return + */ + public long getDispatchRateOnMsg() { + return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1; + } + + /** + * Get configured byte dispatch-throttling rate. Returns -1 if not configured + * + * @return + */ + public long getDispatchRateOnByte() { + return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1; + } + + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java new file mode 100644 index 0000000000000..e1bc7dafe25f6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; + +public class ResourceGroupRateLimiterManager { + + static ResourceGroupDispatchLimiter newReplicationDispatchRateLimiter( + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup, + ScheduledExecutorService executorService) { + long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L); + long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L); + return new ResourceGroupDispatchLimiter(executorService, msgs, bytes); + } + + static void updateReplicationDispatchRateLimiter( + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) { + long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L); + long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L); + resourceGroupDispatchLimiter.update(msgs, bytes); + } + + static void updateReplicationDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + BytesAndMessagesCount quota) { + resourceGroupDispatchLimiter.update(quota.messages, quota.bytes); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 9fcdb6790b30b..d642a9d96e2a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -494,8 +494,9 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin // Find the difference between the last time stats were updated for this topic, and the current // time. If the difference is positive, update the stats. - private void updateStatsWithDiff(String topicName, String tenantString, String nsString, - long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) { + @VisibleForTesting + protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, String nsString, + long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) { ConcurrentHashMap hm; switch (monClass) { default: @@ -509,6 +510,10 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n case Dispatch: hm = this.topicConsumeStats; break; + + case ReplicationDispatch: + hm = this.replicationDispatchStats; + break; } BytesAndMessagesCount bmDiff = new BytesAndMessagesCount(); @@ -518,7 +523,13 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n bmNewCount.bytes = accByteCount; bmNewCount.messages = accMesgCount; - bmOldCount = hm.get(topicName); + String key; + if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { + key = topicName + replicationRemoteCluster; + } else { + key = topicName; + } + bmOldCount = hm.get(key); if (bmOldCount == null) { bmDiff.bytes = bmNewCount.bytes; bmDiff.messages = bmNewCount.messages; @@ -539,7 +550,7 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n topicName, monClass, statsUpdated, tenantString, nsString, bmDiff.bytes, bmDiff.messages); } - hm.put(topicName, bmNewCount); + hm.put(key, bmNewCount); } catch (Throwable t) { log.error("updateStatsWithDiff: got ex={} while aggregating for {} side", t.getMessage(), monClass); @@ -636,10 +647,17 @@ protected void aggregateResourceGroupLocalUsages() { continue; } - this.updateStatsWithDiff(topicName, tenantString, nsString, + topicStats.getReplication().forEach((remoteCluster, stats) -> { + this.updateStatsWithDiff(topicName, remoteCluster, tenantString, nsString, + (long) stats.getMsgThroughputOut(), + (long) stats.getMsgRateOut(), + ResourceGroupMonitoringClass.ReplicationDispatch + ); + }); + this.updateStatsWithDiff(topicName, null, tenantString, nsString, topicStats.getBytesInCounter(), topicStats.getMsgInCounter(), ResourceGroupMonitoringClass.Publish); - this.updateStatsWithDiff(topicName, tenantString, nsString, + this.updateStatsWithDiff(topicName, null, tenantString, nsString, topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(), ResourceGroupMonitoringClass.Dispatch); } @@ -824,6 +842,7 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie // Maps to maintain the usage per topic, in produce/consume directions. private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap<>(); private ConcurrentHashMap topicConsumeStats = new ConcurrentHashMap<>(); + private ConcurrentHashMap replicationDispatchStats = new ConcurrentHashMap<>(); // The task that periodically re-calculates the quota budget for local usage. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 50408b50632e1..65de05e38175a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -45,6 +45,7 @@ public abstract class AbstractReplicator { protected final String remoteCluster; protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; + protected String replicatorId; protected volatile ProducerImpl producer; public static final String REPL_PRODUCER_NAME_DELIMITER = "-->"; @@ -80,6 +81,8 @@ public AbstractReplicator(Topic localTopic, String replicatorPrefix, String loca this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); + this.replicatorId = String.format("%s | %s", topicName, localCluster + "-->" + remoteCluster); + this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // .topic(topicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index eea90efb88371..177d95b6805bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; @@ -48,5 +49,9 @@ default Optional getRateLimiter() { return Optional.empty(); } + default Optional getResourceGroupDispatchRateLimiter() { + return Optional.empty(); + } + boolean isConnected(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 4de9dec171769..2d7a7cb0ba7e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -44,6 +44,9 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; @@ -57,6 +60,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; @@ -71,6 +75,7 @@ public class PersistentReplicator extends AbstractReplicator protected final ManagedCursor cursor; private Optional dispatchRateLimiter = Optional.empty(); + protected Optional resourceGroupDispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); private int readBatchSize; @@ -195,29 +200,53 @@ private int getAvailablePermits() { return 0; } + long availablePermitsOnMsg = -1; + // handle rate limit if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { DispatchRateLimiter rateLimiter = dispatchRateLimiter.get(); - // no permits from rate limit - if (!rateLimiter.hasMessageDispatchPermit()) { + // if dispatch-rate is in msg then read only msg according to available permit + availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); + if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}," + log.debug("[{}] message-read exceeded topic replicator message-rate {}/{}," + " schedule after a {}", - topicName, localCluster, remoteCluster, + replicatorId, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), MESSAGE_RATE_BACKOFF_MS); } return -1; } + } - // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); - if (availablePermitsOnMsg > 0) { - availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + if (resourceGroupDispatchRateLimiter.isPresent()) { + ResourceGroupDispatchLimiter rateLimiter = resourceGroupDispatchRateLimiter.get(); + long rgAvailablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + long rgAvailablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); + if (rgAvailablePermitsOnMsg == 0 || rgAvailablePermitsOnByte == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}," + + " schedule after a {}", + replicatorId, + rateLimiter.getDispatchRateOnMsg(), + rateLimiter.getDispatchRateOnByte(), + MESSAGE_RATE_BACKOFF_MS); + } + return -1; + } + if (availablePermitsOnMsg == -1) { + availablePermitsOnMsg = rgAvailablePermitsOnMsg; + } else { + availablePermitsOnMsg = Math.min(rgAvailablePermitsOnMsg, availablePermitsOnMsg); } } + if (availablePermitsOnMsg > 0) { + availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + } + return availablePermits; } @@ -751,6 +780,11 @@ public Optional getRateLimiter() { return dispatchRateLimiter; } + @Override + public Optional getResourceGroupDispatchRateLimiter() { + return resourceGroupDispatchRateLimiter; + } + @Override public void initializeDispatchRateLimiterIfNeeded() { synchronized (dispatchRateLimiterLock) { @@ -758,13 +792,21 @@ public void initializeDispatchRateLimiterIfNeeded() { && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) { this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR)); } - } - } - @Override - public void updateRateLimiter() { - initializeDispatchRateLimiterIfNeeded(); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate); + ResourceGroupService resourceGroupService = brokerService.getPulsar().getResourceGroupServiceManager(); + HierarchyTopicPolicies hierarchyTopicPolicies = topic.getHierarchyTopicPolicies(); + String resourceGroupName = hierarchyTopicPolicies.getResourceGroupName().get(); + if (resourceGroupName != null) { + ResourceGroup resourceGroup = resourceGroupService.resourceGroupGet(resourceGroupName); + if (resourceGroup != null) { + resourceGroupDispatchRateLimiter = Optional.of(resourceGroup.getResourceGroupReplicationDispatchLimiter()); + } + } else { + if (resourceGroupDispatchRateLimiter.isPresent()) { + resourceGroupDispatchRateLimiter = Optional.empty(); + } + } + } } private void checkReplicatedSubscriptionMarker(Position position, MessageImpl msg, ByteBuf payload) { diff --git a/pulsar-broker/src/main/proto/ResourceUsage.proto b/pulsar-broker/src/main/proto/ResourceUsage.proto index 4706c9dfbcd19..ac7e1dea23457 100644 --- a/pulsar-broker/src/main/proto/ResourceUsage.proto +++ b/pulsar-broker/src/main/proto/ResourceUsage.proto @@ -38,6 +38,7 @@ message ResourceUsage { optional NetworkUsage publish = 2; optional NetworkUsage dispatch = 3; optional StorageUsage storage = 4; + optional NetworkUsage replicationDispatch = 6; } message ResourceUsageInfo { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java new file mode 100644 index 0000000000000..fc9862456d41a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Cleanup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.testng.annotations.Test; + +public class ResourceGroupRateLimiterManagerTest { + + @Test + public void testNewReplicationDispatchRateLimiterWithEmptyResourceGroup() { + org.apache.pulsar.common.policies.data.ResourceGroup emptyResourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(emptyResourceGroup,executorService); + assertFalse(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), -1L); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), -1L); + } + + @Test + public void testReplicationDispatchRateLimiterOnMsgs() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs(10L); + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + assertTrue(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), -1L); + } + + @Test + public void testReplicationDispatchRateLimiterOnBytes() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(20L); + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + assertTrue(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), -1L); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + } + + @Test + public void testUpdateReplicationDispatchRateLimiter() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs(10L); + resourceGroup.setReplicationDispatchRateInBytes(100L); + + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + + BytesAndMessagesCount quota = new BytesAndMessagesCount(); + quota.messages = 20; + quota.bytes = 200; + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupDispatchLimiter, quota); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), quota.bytes); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), quota.bytes); + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), quota.messages); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), quota.messages); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index cb408bcbc0cf1..5d4bce643e6df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -156,6 +156,8 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgConfig.setPublishRateInMsgs(100); rgConfig.setDispatchRateInBytes(40000L); rgConfig.setDispatchRateInMsgs(500); + rgConfig.setReplicationDispatchRateInBytes(2000L); + rgConfig.setReplicationDispatchRateInMsgs(400L); int initialNumQuotaCalculations = numAnonymousQuotaCalculations; rgs.resourceGroupCreate(rgName, rgConfig); @@ -170,6 +172,8 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgConfig.setPublishRateInMsgs(rgConfig.getPublishRateInMsgs()*10); rgConfig.setDispatchRateInBytes(rgConfig.getDispatchRateInBytes()/10); rgConfig.setDispatchRateInMsgs(rgConfig.getDispatchRateInMsgs()/10); + rgConfig.setReplicationDispatchRateInBytes(rgConfig.getReplicationDispatchRateInBytes()/10); + rgConfig.setReplicationDispatchRateInMsgs(rgConfig.getReplicationDispatchRateInMsgs()/10); rgs.resourceGroupUpdate(rgName, rgConfig); Assert.assertEquals(rgs.getNumResourceGroups(), 1); @@ -187,6 +191,9 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep monClassFields = retRG.monitoringClassFields[ResourceGroupMonitoringClass.Dispatch.ordinal()]; Assert.assertEquals(monClassFields.configValuesPerPeriod.bytes, rgConfig.getDispatchRateInBytes().longValue()); Assert.assertEquals(monClassFields.configValuesPerPeriod.messages, rgConfig.getDispatchRateInMsgs().intValue()); + monClassFields = retRG.monitoringClassFields[ResourceGroupMonitoringClass.ReplicationDispatch.ordinal()]; + Assert.assertEquals(monClassFields.configValuesPerPeriod.bytes, rgConfig.getReplicationDispatchRateInBytes().longValue()); + Assert.assertEquals(monClassFields.configValuesPerPeriod.messages, rgConfig.getReplicationDispatchRateInMsgs().intValue()); Assert.assertThrows(PulsarAdminException.class, () -> rgs.resourceGroupDelete(randomRgName)); @@ -217,6 +224,10 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep // Gross hack! if (monClass == ResourceGroupMonitoringClass.Publish) { nwUsage = usage.setPublish(); + } else if (monClass == ResourceGroupMonitoringClass.Dispatch) { + nwUsage = usage.setDispatch(); + } else if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { + nwUsage = usage.setReplicationDispatch(); } else { nwUsage = usage.setDispatch(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java index 4e54d1b3326ca..bcc663342baa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java @@ -79,7 +79,7 @@ public void fillResourceUsage(ResourceUsage resourceUsage) { resourceUsage.setOwner(getID()); resourceUsage.setPublish().setMessagesPerPeriod(1000).setBytesPerPeriod(10001); resourceUsage.setStorage().setTotalBytes(500003); - + resourceUsage.setReplicationDispatch().setMessagesPerPeriod(2000).setBytesPerPeriod(4000); } }; @@ -98,6 +98,10 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { p.setBytesPerPeriod(resourceUsage.getPublish().getBytesPerPeriod()); p.setMessagesPerPeriod(resourceUsage.getPublish().getMessagesPerPeriod()); + p = recvdUsage.setReplicationDispatch(); + p.setBytesPerPeriod(resourceUsage.getReplicationDispatch().getBytesPerPeriod()); + p.setMessagesPerPeriod(resourceUsage.getReplicationDispatch().getMessagesPerPeriod()); + recvdUsage.setStorage().setTotalBytes(resourceUsage.getStorage().getTotalBytes()); } }; @@ -112,6 +116,8 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { assertNotNull(recvdUsage.getStorage()); assertEquals(recvdUsage.getPublish().getBytesPerPeriod(), 10001); assertEquals(recvdUsage.getStorage().getTotalBytes(), 500003); + assertEquals(recvdUsage.getReplicationDispatch().getBytesPerPeriod(), 4000); + assertEquals(recvdUsage.getReplicationDispatch().getMessagesPerPeriod(), 2000); } private void prepareData() throws PulsarServerException, PulsarAdminException, PulsarClientException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index fdf27adc71875..5b7ea15409757 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -19,20 +19,24 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.ResourceGroup; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +102,7 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { // rate limiter disable by default assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(topic.getReplicators().values().get(0).getResourceGroupDispatchRateLimiter().isPresent()); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -119,6 +124,24 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), -1L); + + // ResourceGroupDispatchRateLimiter + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(10L); + resourceGroup.setReplicationDispatchRateInMsgs(20L); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.topicPolicies().setResourceGroup(topicName, resourceGroupName); + + Replicator replicator = topic.getReplicators().values().get(0); + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent()); + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = replicator.getResourceGroupDispatchRateLimiter().get(); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + }); } @Test @@ -142,6 +165,7 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { // rate limiter disable by default assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(topic.getReplicators().values().get(0).getResourceGroupDispatchRateLimiter().isPresent()); //set namespace-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -163,6 +187,24 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), -1L); + + // ResourceGroupDispatchRateLimiter + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(10L); + resourceGroup.setReplicationDispatchRateInMsgs(20L); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.namespaces().setNamespaceResourceGroup(namespace, resourceGroupName); + + Replicator replicator = topic.getReplicators().values().get(0); + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent()); + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = replicator.getResourceGroupDispatchRateLimiter().get(); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + }); } @Test @@ -547,5 +589,55 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti producer.close(); } + @Test + public void testResourceGroupReplicatorRateLimiter() throws Exception { + final String namespace = "pulsar/replicatormsg-" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID(); + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + // ResourceGroupDispatchRateLimiter + int messageRate = 100; + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs((long) messageRate); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.namespaces().setNamespaceResourceGroup(namespace, resourceGroupName); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + @Cleanup + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + final AtomicInteger totalReceived = new AtomicInteger(0); + + @Cleanup + Consumer consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }).subscribe(); + + int numMessages = 500; + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + + Assert.assertTrue(totalReceived.get() < messageRate * 2); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java index bbed3dbb9b825..b023e9329301b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java @@ -75,6 +75,8 @@ public interface ResourceGroups { * "PublishRateInBytes" : "value", * "DispatchRateInMsgs" : "value", * "DispatchRateInBytes" : "value" + * "ReplicationDispatchRateInMsgs" : "value" + * "ReplicationDispatchRateInBytes" : "value" * * * @@ -101,6 +103,8 @@ public interface ResourceGroups { * "PublishRateInBytes" : "value", * "DispatchRateInMsgs" : "value", * "DspatchRateInBytes" : "value" + * "ReplicationDispatchRateInMsgs" : "value" + * "ReplicationDispatchRateInBytes" : "value" * * * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java index 0bfc38637c10f..3cef555f1981e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java @@ -26,4 +26,7 @@ public class ResourceGroup { private Long publishRateInBytes; private Integer dispatchRateInMsgs; private Long dispatchRateInBytes; + + private Long replicationDispatchRateInMsgs; + private Long replicationDispatchRateInBytes; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java index afc9fc1c3aba7..3d44e30ee4293 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java @@ -73,6 +73,14 @@ private class Create extends CliCommand { + "(default -1 will be overwrite if not passed)", required = false) private Long dispatchRateInBytes; + @Parameter(names = {"--replication-msg-dispatch-rate"}, description = "replication-msg-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInMsgs; + + @Parameter(names = {"--replication-byte-dispatch-rate"}, description = "replication-byte-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInBytes; + @Override void run() throws PulsarAdminException { String name = getOneArgument(params); @@ -82,6 +90,8 @@ void run() throws PulsarAdminException { resourcegroup.setDispatchRateInBytes(dispatchRateInBytes); resourcegroup.setPublishRateInMsgs(publishRateInMsgs); resourcegroup.setPublishRateInBytes(publishRateInBytes); + resourcegroup.setReplicationDispatchRateInMsgs(replicationDispatchRateInMsgs); + resourcegroup.setReplicationDispatchRateInBytes(replicationDispatchRateInBytes); getAdmin().resourcegroups().createResourceGroup(name, resourcegroup); } } @@ -108,6 +118,14 @@ private class Update extends CliCommand { "-bd" }, description = "byte-dispatch-rate ", required = false) private Long dispatchRateInBytes; + @Parameter(names = {"--replication-msg-dispatch-rate"}, description = "replication-msg-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInMsgs; + + @Parameter(names = {"--replication-byte-dispatch-rate"}, description = "replication-byte-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInBytes; + @Override void run() throws PulsarAdminException { String name = getOneArgument(params); @@ -117,6 +135,8 @@ void run() throws PulsarAdminException { resourcegroup.setDispatchRateInBytes(dispatchRateInBytes); resourcegroup.setPublishRateInMsgs(publishRateInMsgs); resourcegroup.setPublishRateInBytes(publishRateInBytes); + resourcegroup.setReplicationDispatchRateInMsgs(replicationDispatchRateInMsgs); + resourcegroup.setReplicationDispatchRateInBytes(replicationDispatchRateInBytes); getAdmin().resourcegroups().updateResourceGroup(name, resourcegroup); }