Skip to content

Commit

Permalink
[feat][broker] Add ResourceGroup-based dispatch rate limits to the Topic
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 6, 2024
1 parent 1c40913 commit 80f227c
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ protected ResourceGroup(ResourceGroupService rgs, String name,
this.getResourceGroupPublishLimiter());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
}

// ctor for overriding the transport-manager fill/set buffer.
Expand All @@ -103,6 +105,8 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName,
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.ruPublisher = rgPublisher;
this.ruConsumer = rgConsumer;
}
Expand All @@ -114,6 +118,7 @@ public ResourceGroup(ResourceGroup other) {
this.rgs = other.rgs;
this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter;
this.resourceGroupDispatchLimiter = other.resourceGroupDispatchLimiter;
this.setResourceGroupMonitoringClassFields();

// ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required.
Expand Down Expand Up @@ -153,7 +158,9 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour
pubBmc.messages = rgConfig.getPublishRateInMsgs();
pubBmc.bytes = rgConfig.getPublishRateInBytes();
this.resourceGroupPublishLimiter.update(pubBmc);
ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig);
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig);
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, rgConfig);
}

protected long getResourceGroupNumOfNSRefs() {
Expand Down Expand Up @@ -384,11 +391,15 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
monEntity.quotaForNextPeriod = newQuota;
switch (monClass) {
case ReplicationDispatch:
ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota);
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota);
break;
case Publish:
this.resourceGroupPublishLimiter.update(newQuota);
break;
case Dispatch:
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, newQuota);
break;
default:
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={};", monClass);
Expand Down Expand Up @@ -601,9 +612,11 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies.
? -1 : rgConfig.getDispatchRateInMsgs();

idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal();
this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null
this.monitoringClassFields[idx]
.configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null
? -1 : rgConfig.getReplicationDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null
this.monitoringClassFields[idx]
.configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null
? -1 : rgConfig.getReplicationDispatchRateInMsgs();
}

Expand Down Expand Up @@ -684,7 +697,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupTopicDispatchLimiter;
protected ResourceGroupDispatchLimiter resourceGroupDispatchLimiter;

protected static class PerMonitoringClassFields {
// This lock covers all the "local" counts (i.e., except for the per-broker usage stats).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -28,7 +28,8 @@ public class ResourceGroupDispatchLimiter implements AutoCloseable {
private volatile RateLimiter dispatchRateLimiterOnMessage;
private volatile RateLimiter dispatchRateLimiterOnByte;

public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, long dispatchRateInMsgs, long dispatchRateInBytes) {
public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService,
long dispatchRateInMsgs, long dispatchRateInBytes) {
this.executorService = executorService;
update(dispatchRateInMsgs, dispatchRateInBytes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -44,4 +44,24 @@ static void updateReplicationDispatchRateLimiter(ResourceGroupDispatchLimiter re
BytesAndMessagesCount quota) {
resourceGroupDispatchLimiter.update(quota.messages, quota.bytes);
}

static ResourceGroupDispatchLimiter newDispatchRateLimiter(
org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup,
ScheduledExecutorService executorService) {
long msgs = Optional.ofNullable(resourceGroup.getDispatchRateInMsgs()).orElse(-1);
long bytes = Optional.ofNullable(resourceGroup.getDispatchRateInBytes()).orElse(-1L);
return new ResourceGroupDispatchLimiter(executorService, msgs, bytes);
}

static void updateDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter,
org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) {
long msgs = Optional.ofNullable(resourceGroup.getDispatchRateInMsgs()).orElse(-1);
long bytes = Optional.ofNullable(resourceGroup.getDispatchRateInBytes()).orElse(-1L);
resourceGroupDispatchLimiter.update(msgs, bytes);
}

static void updateDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter,
BytesAndMessagesCount quota) {
resourceGroupDispatchLimiter.update(quota.messages, quota.bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,9 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin
// Find the difference between the last time stats were updated for this topic, and the current
// time. If the difference is positive, update the stats.
@VisibleForTesting
protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, String nsString,
long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) {
protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString,
String nsString, long accByteCount, long accMesgCount,
ResourceGroupMonitoringClass monClass) {
ConcurrentHashMap<String, BytesAndMessagesCount> hm;
switch (monClass) {
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -120,6 +121,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP

protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;

@Getter
protected volatile Optional<ResourceGroupDispatchLimiter> resourceGroupDispatchRateLimiter = Optional.empty();

protected boolean preciseTopicPublishRateLimitingEnable;

@Getter
Expand Down Expand Up @@ -1153,6 +1157,7 @@ public void updateResourceGroupLimiter() {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
this.resourceGroupDispatchRateLimiter = Optional.of(resourceGroup.getResourceGroupDispatchLimiter());
log.info("Using resource group {} rate limiter for topic {}", rgName, topic);
}
} else {
Expand All @@ -1166,6 +1171,7 @@ public void updateResourceGroupLimiter() {
protected void closeResourceGroupLimiter() {
if (resourceGroupRateLimitingEnabled) {
this.resourceGroupPublishLimiter = null;
this.resourceGroupDispatchRateLimiter = Optional.empty();
this.resourceGroupRateLimitingEnabled = false;
brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
Expand Down Expand Up @@ -286,6 +287,10 @@ default Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return Optional.empty();
}

default Optional<ResourceGroupDispatchLimiter> getResourceGroupDispatchRateLimiter() {
return Optional.empty();
}

default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -398,6 +399,27 @@ protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits)
bytesToRead = calculateToRead.getRight();
}
}

if (topic.getResourceGroupDispatchRateLimiter().isPresent()) {
ResourceGroupDispatchLimiter limiter = topic.getResourceGroupDispatchRateLimiter().get();
long availableDispatchRateLimitOnMsg = limiter.getAvailableDispatchRateLimitOnMsg();
long availableDispatchRateLimitOnByte = limiter.getAvailableDispatchRateLimitOnByte();
if (availableDispatchRateLimitOnMsg == 0 || availableDispatchRateLimitOnByte == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}, schedule after a {}",
name, limiter.getDispatchRateOnMsg(), limiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
reScheduleRead();
return Pair.of(-1, -1L);
} else {
Pair<Integer, Long> calculateToRead =
computeReadLimits(messagesToRead, (int) availableDispatchRateLimitOnMsg, bytesToRead,
availableDispatchRateLimitOnByte);
messagesToRead = calculateToRead.getLeft();
bytesToRead = calculateToRead.getRight();
}
}
}

if (havePendingReplayRead) {
Expand Down Expand Up @@ -658,6 +680,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}

Optional<ResourceGroupDispatchLimiter> resourceGroupDispatchRateLimiter =
topic.getResourceGroupDispatchRateLimiter();
if (resourceGroupDispatchRateLimiter.isPresent()) {
resourceGroupDispatchRateLimiter.get().consumeDispatchQuota(permits, totalBytesSent);
}
}

if (entriesToDispatch > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
Expand Down Expand Up @@ -237,6 +238,10 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes()));
topic.getResourceGroupDispatchRateLimiter().ifPresent(resourceGroupDispatchLimiter ->
resourceGroupDispatchLimiter.consumeDispatchQuota(permits,
sendMessageInfo.getTotalBytes())
);
}

// Schedule a new read batch operation only after the previous batch has been written to the socket.
Expand Down Expand Up @@ -469,6 +474,30 @@ protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
bytesToRead = calculateToRead.getRight();
}
}

if (topic.getResourceGroupDispatchRateLimiter().isPresent()) {
ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = topic.getResourceGroupDispatchRateLimiter()
.get();
long availableDispatchRateLimitOnMsg =
resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg();
long availableDispatchRateLimitOnByte =
resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte();
if (availableDispatchRateLimitOnMsg == 0 || availableDispatchRateLimitOnByte == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}, schedule after a {}",
name, availableDispatchRateLimitOnMsg, availableDispatchRateLimitOnByte,
MESSAGE_RATE_BACKOFF_MS);
}
reScheduleRead();
return Pair.of(-1, -1L);
} else {
Pair<Integer, Long> calculateToRead =
computeReadLimits(messagesToRead, (int) availableDispatchRateLimitOnMsg, bytesToRead,
availableDispatchRateLimitOnByte);
messagesToRead = calculateToRead.getLeft();
bytesToRead = calculateToRead.getRight();
}
}
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -38,6 +39,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
Expand Down Expand Up @@ -313,6 +315,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}

Optional<ResourceGroupDispatchLimiter> resourceGroupDispatchRateLimiter =
topic.getResourceGroupDispatchRateLimiter();
if (resourceGroupDispatchRateLimiter.isPresent()) {
resourceGroupDispatchRateLimiter.get().consumeDispatchQuota(permits, totalBytesSent);
}
}

if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down

0 comments on commit 80f227c

Please sign in to comment.