Skip to content

Commit

Permalink
[improve][broker] Improve ResourceGroup
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed May 13, 2024
1 parent 31e92da commit e3b2d33
Show file tree
Hide file tree
Showing 13 changed files with 485 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,33 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int resourceUsageTransportPublishIntervalInSecs = 60;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "The percentage difference that is considered \"within limits\" to suppress usage reporting"
+ "Setting this to 0 will also make us report in every round."
)
private int resourceUsageReportSuppressionTolerancePercentage = 5;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "The maximum number of successive rounds that we can suppress reporting local usage, because there "
+ "was no substantial change from the prior round. This is to ensure the reporting does not "
+ "become too chatty. Set this value to one more than the cadence of sending reports; e.g., if "
+ "you want to send every 3rd report, set the value to 4."
+ "Setting this to 0 will make us report in every round."
+ "Don't set to negative values; behavior will be disabled"
)
private int resourceUsageMaxUsageReportSuppressRounds = 5;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "ResourceGroup rate limit will be triggered when the total traffic exceeds the product of the "
+ "rate-limit value and the threshold. Value range: [0,1]."
)
private double resourceGroupLocalQuotaThreshold = 0;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.ToString;
import lombok.val;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus;
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
Expand All @@ -49,6 +53,7 @@ public class ResourceGroup {
/**
* Convenience class for bytes and messages counts, which are used together in a lot of the following code.
*/
@ToString
public static class BytesAndMessagesCount {
public long bytes;
public long messages;
Expand Down Expand Up @@ -356,7 +361,7 @@ protected BytesAndMessagesCount getLocalUsageStatsFromBrokerReports(ResourceGrou

monEntity.usageFromOtherBrokersLock.lock();
try {
pbus = monEntity.usageFromOtherBrokers.get(myBrokerId);
pbus = monEntity.usageFromOtherBrokers.getIfPresent(myBrokerId);
} finally {
monEntity.usageFromOtherBrokersLock.unlock();
}
Expand All @@ -380,7 +385,7 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass
monEntity.usageFromOtherBrokersLock.lock();
BytesAndMessagesCount retStats = new BytesAndMessagesCount();
try {
monEntity.usageFromOtherBrokers.forEach((broker, brokerUsage) -> {
monEntity.usageFromOtherBrokers.asMap().forEach((broker, brokerUsage) -> {
retStats.bytes += brokerUsage.usedValues.bytes;
retStats.messages += brokerUsage.usedValues.messages;
});
Expand Down Expand Up @@ -512,8 +517,6 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
} else {
numSuppressions = monEntity.numSuppressedUsageReports++;
Expand Down Expand Up @@ -552,7 +555,7 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
long newByteCount, newMessageCount;

monEntity = this.monitoringClassFields[idx];
usageStats = monEntity.usageFromOtherBrokers.get(broker);
usageStats = monEntity.usageFromOtherBrokers.getIfPresent(broker);
if (usageStats == null) {
usageStats = new PerBrokerUsageStats();
usageStats.usedValues = new BytesAndMessagesCount();
Expand All @@ -564,7 +567,8 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
newMessageCount = p.getMessagesPerPeriod();
usageStats.usedValues.messages = newMessageCount;
usageStats.lastResourceUsageReadTimeMSecsSinceEpoch = System.currentTimeMillis();
oldUsageStats = monEntity.usageFromOtherBrokers.put(broker, usageStats);
oldUsageStats = monEntity.usageFromOtherBrokers.getIfPresent(broker);
monEntity.usageFromOtherBrokers.put(broker, usageStats);
} finally {
monEntity.usageFromOtherBrokersLock.unlock();
}
Expand All @@ -587,21 +591,18 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
}

private void setResourceGroupMonitoringClassFields() {
PerMonitoringClassFields monClassFields;
ServiceConfiguration conf = rgs.getPulsar().getConfiguration();
long resourceUsageTransportPublishIntervalInSecs = conf.getResourceUsageTransportPublishIntervalInSecs();
int maxUsageReportSuppressRounds = Math.max(conf.getResourceUsageMaxUsageReportSuppressRounds(), 1);
long cacheInMS = TimeUnit.SECONDS.toMillis(
resourceUsageTransportPublishIntervalInSecs * maxUsageReportSuppressRounds * 2);
// Usage report data is cached to the memory, when the broker is restart or offline, we need an elimination
// strategy to release the quota occupied by other broker.
//
// Considering that each broker starts at a different time, the cache time should be equal to the mandatory
// reporting period * 2.
for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) {
this.monitoringClassFields[idx] = new PerMonitoringClassFields();

monClassFields = this.monitoringClassFields[idx];
monClassFields.configValuesPerPeriod = new BytesAndMessagesCount();
monClassFields.usedLocallySinceLastReport = new BytesAndMessagesCount();
monClassFields.lastReportedValues = new BytesAndMessagesCount();
monClassFields.quotaForNextPeriod = new BytesAndMessagesCount();
monClassFields.totalUsedLocally = new BytesAndMessagesCount();
monClassFields.usageFromOtherBrokers = new HashMap<>();

monClassFields.usageFromOtherBrokersLock = new ReentrantLock();
// ToDo: Change the following to a ReadWrite lock if needed.
monClassFields.localUsageStatsLock = new ReentrantLock();
this.monitoringClassFields[idx] = PerMonitoringClassFields.create(cacheInMS);
}
}

Expand Down Expand Up @@ -737,11 +738,33 @@ protected static class PerMonitoringClassFields {
int numSuppressedUsageReports;

// Accumulated stats of local usage.
@VisibleForTesting
BytesAndMessagesCount totalUsedLocally;

// This lock covers all the non-local usage counts, received from other brokers.
Lock usageFromOtherBrokersLock;
public HashMap<String, PerBrokerUsageStats> usageFromOtherBrokers;
public Cache<String, PerBrokerUsageStats> usageFromOtherBrokers;

private PerMonitoringClassFields(){

}

static PerMonitoringClassFields create(long durationMs) {
PerMonitoringClassFields perMonitoringClassFields = new PerMonitoringClassFields();
perMonitoringClassFields.configValuesPerPeriod = new BytesAndMessagesCount();
perMonitoringClassFields.usedLocallySinceLastReport = new BytesAndMessagesCount();
perMonitoringClassFields.lastReportedValues = new BytesAndMessagesCount();
perMonitoringClassFields.quotaForNextPeriod = new BytesAndMessagesCount();
perMonitoringClassFields.totalUsedLocally = new BytesAndMessagesCount();
perMonitoringClassFields.usageFromOtherBrokersLock = new ReentrantLock();
// ToDo: Change the following to a ReadWrite lock if needed.
perMonitoringClassFields.localUsageStatsLock = new ReentrantLock();

perMonitoringClassFields.usageFromOtherBrokers = Caffeine.newBuilder()
.expireAfterWrite(durationMs, TimeUnit.MILLISECONDS)
.build();
return perMonitoringClassFields;
}
}

// Usage stats for this RG obtained from other brokers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
}
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param numberOfMessages
* @param byteSize
*/
public boolean tryAcquire(long numberOfMessages, long byteSize) {
if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
if (!dispatchRateLimiterOnMessage.tryAcquire(numberOfMessages)) {
return false;
}
}
if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
if (!dispatchRateLimiterOnByte.tryAcquire(byteSize)) {
return false;
}
}

return true;
}

/**
* Checks if dispatch-rate limiting is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class ResourceGroupService implements AutoCloseable{
public ResourceGroupService(PulsarService pulsar) {
this.pulsar = pulsar;
this.timeUnitScale = TimeUnit.SECONDS;
this.quotaCalculator = new ResourceQuotaCalculatorImpl();
this.quotaCalculator = new ResourceQuotaCalculatorImpl(pulsar);
this.resourceUsageTransportManagerMgr = pulsar.getResourceUsageTransportManager();
this.rgConfigListener = new ResourceGroupConfigListener(this, pulsar);
this.initialize();
Expand Down Expand Up @@ -360,8 +362,9 @@ public void close() throws Exception {
resourceGroupsMap.clear();
tenantToRGsMap.clear();
namespaceToRGsMap.clear();
topicProduceStats.clear();
topicConsumeStats.clear();
topicProduceStats.invalidateAll();
topicConsumeStats.invalidateAll();
replicationDispatchStats.invalidateAll();
}

private void incrementUsage(ResourceGroup resourceGroup,
Expand Down Expand Up @@ -498,7 +501,7 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin
protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString,
String nsString, long accByteCount, long accMsgCount,
ResourceGroupMonitoringClass monClass) {
ConcurrentHashMap<String, BytesAndMessagesCount> hm;
Cache<String, BytesAndMessagesCount> hm;
switch (monClass) {
default:
log.error("updateStatsWithDiff: Unknown monitoring class={}; ignoring", monClass);
Expand Down Expand Up @@ -530,7 +533,7 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu
} else {
key = topicName;
}
bmOldCount = hm.get(key);
bmOldCount = hm.getIfPresent(key);
if (bmOldCount == null) {
bmDiff.bytes = bmNewCount.bytes;
bmDiff.messages = bmNewCount.messages;
Expand Down Expand Up @@ -650,8 +653,8 @@ protected void aggregateResourceGroupLocalUsages() {

topicStats.getReplication().forEach((remoteCluster, stats) -> {
this.updateStatsWithDiff(topicName, remoteCluster, tenantString, nsString,
(long) stats.getMsgThroughputOut(),
(long) stats.getMsgRateOut(),
stats.getBytesOutCounter(),
stats.getMsgOutCounter(),
ResourceGroupMonitoringClass.ReplicationDispatch
);
});
Expand Down Expand Up @@ -779,8 +782,6 @@ protected void calculateQuotaForAllResourceGroups() {
newPeriodInSeconds,
timeUnitScale);
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
}
}

Expand All @@ -798,9 +799,11 @@ private void initialize() {
periodInSecs,
periodInSecs,
this.timeUnitScale);
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;

long resourceUsagePublishPeriodInMS = TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds);
long statsCacheInMS = resourceUsagePublishPeriodInMS * 2;
topicProduceStats = newStatsCache(statsCacheInMS);
topicConsumeStats = newStatsCache(statsCacheInMS);
replicationDispatchStats = newStatsCache(statsCacheInMS);
}

private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
Expand All @@ -819,6 +822,13 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
}
}

@VisibleForTesting
protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
return Caffeine.newBuilder()
.expireAfterAccess(durationMS, TimeUnit.MILLISECONDS)
.build();
}

private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);

@Getter
Expand All @@ -841,10 +851,9 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
private ConcurrentHashMap<TopicName, ResourceGroup> topicToRGsMap = new ConcurrentHashMap<>();

// Maps to maintain the usage per topic, in produce/consume directions.
private ConcurrentHashMap<String, BytesAndMessagesCount> topicProduceStats = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BytesAndMessagesCount> topicConsumeStats = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BytesAndMessagesCount> replicationDispatchStats = new ConcurrentHashMap<>();

private Cache<String, BytesAndMessagesCount> topicProduceStats;
private Cache<String, BytesAndMessagesCount> topicConsumeStats;
private Cache<String, BytesAndMessagesCount> replicationDispatchStats;

// The task that periodically re-calculates the quota budget for local usage.
private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
Expand All @@ -857,22 +866,6 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
// Allow a pluggable scale on time units; for testing periodic functionality.
private TimeUnit timeUnitScale;

// The maximum number of successive rounds that we can suppress reporting local usage, because there was no
// substantial change from the prior round. This is to ensure the reporting does not become too chatty.
// Set this value to one more than the cadence of sending reports; e.g., if you want to send every 3rd report,
// set the value to 4.
// Setting this to 0 will make us report in every round.
// Don't set to negative values; behavior will be "undefined".
protected static final int MaxUsageReportSuppressRounds = 5;

// Convenient shorthand, for MaxUsageReportSuppressRounds converted to a time interval in milliseconds.
protected static long maxIntervalForSuppressingReportsMSecs;

// The percentage difference that is considered "within limits" to suppress usage reporting.
// Setting this to 0 will also make us report in every round.
// Don't set it to negative values; behavior will be "undefined".
protected static final float UsageReportSuppressionTolerancePercentage = 5;

// Labels for the various counters used here.
private static final String[] resourceGroupLabel = {"ResourceGroup"};
private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};
Expand Down Expand Up @@ -953,15 +946,20 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
.register();

@VisibleForTesting
ConcurrentHashMap getTopicConsumeStats() {
Cache<String, BytesAndMessagesCount> getTopicConsumeStats() {
return this.topicConsumeStats;
}

@VisibleForTesting
ConcurrentHashMap getTopicProduceStats() {
Cache<String, BytesAndMessagesCount> getTopicProduceStats() {
return this.topicProduceStats;
}

@VisibleForTesting
Cache<String, BytesAndMessagesCount> getReplicationDispatchStats() {
return this.replicationDispatchStats;
}

@VisibleForTesting
ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
return this.aggregateLocalUsagePeriodicTask;
Expand Down
Loading

0 comments on commit e3b2d33

Please sign in to comment.