Skip to content

Commit

Permalink
[feat][broker] Add ResourceGroup-based dispatch rate limits to the Re…
Browse files Browse the repository at this point in the history
…plicator

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 6, 2024
1 parent e908106 commit 46556df
Show file tree
Hide file tree
Showing 15 changed files with 582 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static class BytesAndMessagesCount {
public enum ResourceGroupMonitoringClass {
Publish,
Dispatch,
ReplicationDispatch,
// Storage; // Punt this for now, until we have a clearer idea of the usage, statistics, etc.
}

Expand All @@ -69,7 +70,8 @@ public enum ResourceGroupMonitoringClass {
public enum ResourceGroupRefTypes {
Tenants,
Namespaces,
Topics
Topics,
Replicators,
}

// Default ctor: it is not expected that anything outside of this package will need to directly
Expand All @@ -84,6 +86,8 @@ protected ResourceGroup(ResourceGroupService rgs, String name,
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name,
this.getResourceGroupPublishLimiter());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
}

// ctor for overriding the transport-manager fill/set buffer.
Expand All @@ -97,6 +101,8 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName,
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.ruPublisher = rgPublisher;
this.ruConsumer = rgConsumer;
}
Expand All @@ -107,6 +113,7 @@ public ResourceGroup(ResourceGroup other) {
this.resourceGroupName = other.resourceGroupName;
this.rgs = other.rgs;
this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter;
this.setResourceGroupMonitoringClassFields();

// ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required.
Expand Down Expand Up @@ -146,6 +153,7 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour
pubBmc.messages = rgConfig.getPublishRateInMsgs();
pubBmc.bytes = rgConfig.getPublishRateInBytes();
this.resourceGroupPublishLimiter.update(pubBmc);
ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig);
}

protected long getResourceGroupNumOfNSRefs() {
Expand Down Expand Up @@ -230,6 +238,9 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
p = resourceUsage.setDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);

p = resourceUsage.setReplicationDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p);

// Punt storage for now.
}

Expand All @@ -243,6 +254,9 @@ public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage)
p = resourceUsage.getDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);

p = resourceUsage.getReplicationDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker);

// Punt storage for now.
}

Expand Down Expand Up @@ -360,14 +374,6 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass

protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
// Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
}
return null;
}

this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount;

Expand All @@ -376,7 +382,18 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
oldBMCount = monEntity.quotaForNextPeriod;
try {
monEntity.quotaForNextPeriod = newQuota;
this.resourceGroupPublishLimiter.update(newQuota);
switch (monClass) {
case ReplicationDispatch:
ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota);
break;
case Publish:
this.resourceGroupPublishLimiter.update(newQuota);
break;
default:
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={};", monClass);
}
}
} finally {
monEntity.localUsageStatsLock.unlock();
}
Expand Down Expand Up @@ -428,9 +445,16 @@ protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount .
}

private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
if (monClass != ResourceGroupMonitoringClass.Publish && monClass != ResourceGroupMonitoringClass.Dispatch) {
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
switch (monClass) {
case Publish:
break;
case Dispatch:
break;
case ReplicationDispatch:
break;
default:
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
}
}

Expand Down Expand Up @@ -575,6 +599,12 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies.
? -1 : rgConfig.getDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs() == null
? -1 : rgConfig.getDispatchRateInMsgs();

idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null
? -1 : rgConfig.getReplicationDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null
? -1 : rgConfig.getReplicationDispatchRateInMsgs();
}

private void setDefaultResourceUsageTransportHandlers() {
Expand Down Expand Up @@ -650,6 +680,12 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
@Getter
protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupTopicDispatchLimiter;

protected static class PerMonitoringClassFields {
// This lock covers all the "local" counts (i.e., except for the per-broker usage stats).
Lock localUsageStatsLock;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.util.RateLimiter;

public class ResourceGroupDispatchLimiter implements AutoCloseable {

private final ScheduledExecutorService executorService;
private volatile RateLimiter dispatchRateLimiterOnMessage;
private volatile RateLimiter dispatchRateLimiterOnByte;

public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, long dispatchRateInMsgs, long dispatchRateInBytes) {
this.executorService = executorService;
update(dispatchRateInMsgs, dispatchRateInBytes);
}

public void update(long dispatchRateInMsgs, long dispatchRateInBytes) {
if (dispatchRateInMsgs > 0) {
if (dispatchRateLimiterOnMessage != null) {
this.dispatchRateLimiterOnMessage.setRate(dispatchRateInMsgs);
} else {
this.dispatchRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(executorService)
.permits(dispatchRateInMsgs)
.rateTime(1)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(null)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
} else {
if (this.dispatchRateLimiterOnMessage != null) {
this.dispatchRateLimiterOnMessage.close();
this.dispatchRateLimiterOnMessage = null;
}
}

if (dispatchRateInBytes > 0) {
if (dispatchRateLimiterOnByte != null) {
this.dispatchRateLimiterOnByte.setRate(dispatchRateInBytes);
} else {
this.dispatchRateLimiterOnByte =
RateLimiter.builder()
.scheduledExecutorService(executorService)
.permits(dispatchRateInBytes)
.rateTime(1)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(null)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
} else {
if (this.dispatchRateLimiterOnByte != null) {
this.dispatchRateLimiterOnByte.close();
this.dispatchRateLimiterOnByte = null;
}
}
}

/**
* returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnMsg() {
return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits();
}

/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param numberOfMessages
* @param byteSize
*/
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
dispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
}
if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
dispatchRateLimiterOnByte.tryAcquire(byteSize);
}
}

/**
* Checks if dispatch-rate limiting is enabled.
*
* @return
*/
public boolean isDispatchRateLimitingEnabled() {
return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null;
}

public void close() {
if (dispatchRateLimiterOnMessage != null) {
dispatchRateLimiterOnMessage = null;
}
if (dispatchRateLimiterOnByte != null) {
dispatchRateLimiterOnByte = null;
}
}

/**
* Get configured msg dispatch-throttling rate. Returns -1 if not configured
*
* @return
*/
public long getDispatchRateOnMsg() {
return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1;
}

/**
* Get configured byte dispatch-throttling rate. Returns -1 if not configured
*
* @return
*/
public long getDispatchRateOnByte() {
return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;

public class ResourceGroupRateLimiterManager {

static ResourceGroupDispatchLimiter newReplicationDispatchRateLimiter(
org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup,
ScheduledExecutorService executorService) {
long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L);
long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L);
return new ResourceGroupDispatchLimiter(executorService, msgs, bytes);
}

static void updateReplicationDispatchRateLimiter(
ResourceGroupDispatchLimiter resourceGroupDispatchLimiter,
org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) {
long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L);
long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L);
resourceGroupDispatchLimiter.update(msgs, bytes);
}

static void updateReplicationDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter,
BytesAndMessagesCount quota) {
resourceGroupDispatchLimiter.update(quota.messages, quota.bytes);
}
}
Loading

0 comments on commit 46556df

Please sign in to comment.