From 6556f43e3fc47b395ddbea1b196d755b5d946c18 Mon Sep 17 00:00:00 2001
From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com>
Date: Wed, 3 Apr 2024 21:01:49 -0700
Subject: [PATCH 1/7] [fix][broker] Update TransferShedder underloaded broker
check to consider max loaded broker's msgThroughputEMA and update
IsExtensibleLoadBalancerImpl check (#22321) (#22417)
(cherry picked from commit 651908a903301da9c07dc93300635cc28d8ee69f)
---
.../apache/pulsar/broker/PulsarService.java | 8 ++---
.../broker/admin/impl/NamespacesBase.java | 4 +--
.../extensions/ExtensibleLoadManagerImpl.java | 4 ---
.../extensions/scheduler/TransferShedder.java | 22 +++++++++-----
.../broker/namespace/NamespaceService.java | 30 +++++++++----------
.../pulsar/broker/web/PulsarWebResource.java | 4 +--
.../ExtensibleLoadManagerImplTest.java | 1 -
.../scheduler/TransferShedderTest.java | 13 ++++----
8 files changed, 45 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 64f4ee0288122..5410bacbe78eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -379,7 +379,7 @@ public void closeMetadataServiceSession() throws Exception {
}
private void closeLeaderElectionService() throws Exception {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
} else {
if (this.leaderElectionService != null) {
@@ -1135,7 +1135,7 @@ protected void closeLocalMetadataStore() throws Exception {
}
protected void startLeaderElectionService() {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
@@ -1250,7 +1250,7 @@ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();
- if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
@@ -1343,7 +1343,7 @@ public boolean isRunning() {
* @return a reference of the current LeaderElectionService
instance.
*/
public LeaderElectionService getLeaderElectionService() {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
} else {
return this.leaderElectionService;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9478857032f22..f4732cad38040 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -968,13 +968,13 @@ public CompletableFuture setNamespaceBundleAffinityAsync(String bundleRang
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return CompletableFuture.completedFuture(null);
}
return validateLeaderBrokerAsync();
})
.thenAccept(__ -> {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return;
}
// For ExtensibleLoadManager, this operation will be ignored.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 6a0e677c66268..26ee45b7444f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -263,10 +263,6 @@ public ExtensibleLoadManagerImpl() {
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}
- public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
- return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
- }
-
public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 3564b4e9e3b94..7126ccb034196 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -362,7 +362,7 @@ public Set findBundlesForUnloading(LoadManagerContext context,
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
if (stats.std() > targetStd
- || isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
+ || isUnderLoaded(context, stats.peekMinBroker(), stats)
|| isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
unloadConditionHitCount++;
} else {
@@ -390,7 +390,7 @@ public Set findBundlesForUnloading(LoadManagerContext context,
UnloadDecision.Reason reason;
if (stats.std() > targetStd) {
reason = Overloaded;
- } else if (isUnderLoaded(context, stats.peekMinBroker(), stats.avg)) {
+ } else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
reason = Underloaded;
if (debugMode) {
log.info(String.format("broker:%s is underloaded:%s although "
@@ -669,19 +669,27 @@ public Set findBundlesForUnloading(LoadManagerContext context,
}
- private boolean isUnderLoaded(LoadManagerContext context, String broker, double avgLoad) {
+ private boolean isUnderLoaded(LoadManagerContext context, String broker, LoadStats stats) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
return false;
}
var brokerLoadData = brokerLoadDataOptional.get();
- if (brokerLoadData.getMsgThroughputEMA() < 1) {
+
+ var underLoadedMultiplier =
+ Math.min(0.5, Math.max(0.0, context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0));
+
+ if (brokerLoadData.getWeightedMaxEMA() < stats.avg * underLoadedMultiplier) {
return true;
}
- return brokerLoadData.getWeightedMaxEMA()
- < avgLoad * Math.min(0.5, Math.max(0.0,
- context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
+ var maxBrokerLoadDataOptional = context.brokerLoadDataStore().get(stats.peekMaxBroker());
+ if (maxBrokerLoadDataOptional.isEmpty()) {
+ return false;
+ }
+
+ return brokerLoadData.getMsgThroughputEMA()
+ < maxBrokerLoadDataOptional.get().getMsgThroughputEMA() * underLoadedMultiplier;
}
private boolean isOverLoaded(LoadManagerContext context, String broker, double avgLoad) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 61e045ed304fd..e04be25fe499c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -195,7 +195,7 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN
pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
} else {
// TODO: Add unit tests cover it.
@@ -311,7 +311,7 @@ private CompletableFuture> internalGetWebServiceUrl(@Nullable Serv
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture> future =
- ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+ ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
findBrokerServiceUrl(bundle, options);
@@ -375,7 +375,7 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
final NamespaceEphemeralData otherData;
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
@@ -781,7 +781,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
@@ -803,7 +803,7 @@ public CompletableFuture