diff --git a/bin/pulsar b/bin/pulsar index 7c2a75aad665f..8f8d77ed1f74d 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -322,8 +322,6 @@ if [[ -z "$IS_JAVA_8" ]]; then OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED" # MBeanStatsGenerator OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED" - # LinuxInfoUtils - OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED" fi ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true" diff --git a/pom.xml b/pom.xml index a0eb46683990e..4902bb4d27652 100644 --- a/pom.xml +++ b/pom.xml @@ -1921,7 +1921,6 @@ flexible messaging model and an intuitive client API. --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED - --add-opens java.base/jdk.internal.platform=ALL-UNNAMED diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java deleted file mode 100644 index 7e0d0dcfda1cc..0000000000000 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -public enum BitRateUnit { - - Bit { - public double toBit(double bitRate) { - return bitRate; - } - - public double toKilobit(double bitRate) { - return bitRate / C0; - } - - public double toMegabit(double bitRate) { - return bitRate / Math.pow(C0, 2); - } - - public double toGigabit(double bitRate) { - return bitRate / Math.pow(C0, 3); - } - - public double toByte(double bitRate) { - return bitRate / C1; - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - return bitRateUnit.toBit(bitRate); - } - }, - Kilobit { - public double toBit(double bitRate) { - return bitRate * C0; - } - - public double toKilobit(double bitRate) { - return bitRate; - } - - public double toMegabit(double bitRate) { - return bitRate / C0; - } - - public double toGigabit(double bitRate) { - return bitRate / Math.pow(C0, 2); - } - - public double toByte(double bitRate) { - return bitRate * C0 / C1; - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - return bitRateUnit.toKilobit(bitRate); - } - }, - Megabit { - public double toBit(double bitRate) { - return bitRate * Math.pow(C0, 2); - } - - public double toKilobit(double bitRate) { - return bitRate * C0; - } - - public double toMegabit(double bitRate) { - return bitRate; - } - - public double toGigabit(double bitRate) { - return bitRate / C0; - } - - public double toByte(double bitRate) { - return bitRate * Math.pow(C0, 2) / C1; - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - return bitRateUnit.toMegabit(bitRate); - } - }, - Gigabit { - public double toBit(double bitRate) { - return bitRate * Math.pow(C0, 3); - } - - public double toKilobit(double bitRate) { - return bitRate * Math.pow(C0, 2); - } - - public double toMegabit(double bitRate) { - return bitRate * C0; - } - - public double toGigabit(double bitRate) { - return bitRate; - } - - public double toByte(double bitRate) { - return bitRate * Math.pow(C0, 3) / C1; - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - return bitRateUnit.toGigabit(bitRate); - } - }, - Byte { - public double toBit(double bitRate) { - return bitRate * C1; - } - - public double toKilobit(double bitRate) { - return bitRate * C1 / C0; - } - - public double toMegabit(double bitRate) { - return bitRate * C1 / Math.pow(C0, 2); - } - - public double toGigabit(double bitRate) { - return bitRate * C1 / Math.pow(C0, 3); - } - - public double toByte(double bitRate) { - return bitRate; - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - return bitRateUnit.toByte(bitRate); - } - }; - - static final int C0 = 1000; - static final int C1 = 8; - - public double toBit(double bitRate) { - throw new AbstractMethodError(); - } - - public double toKilobit(double bitRate) { - throw new AbstractMethodError(); - } - - public double toMegabit(double bitRate) { - throw new AbstractMethodError(); - } - - public double toGigabit(double bitRate) { - throw new AbstractMethodError(); - } - - public double toByte(double bitRate) { - throw new AbstractMethodError(); - } - - public double convert(double bitRate, BitRateUnit bitRateUnit) { - throw new AbstractMethodError(); - } -} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java deleted file mode 100644 index 14adcc7027334..0000000000000 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -import static org.testng.Assert.assertEquals; -import org.testng.annotations.Test; - -public class BitRateUnitTest { - - @Test - public void testBps() { - double bps = 1231434.12; - assertEquals(BitRateUnit.Bit.toBit(bps), bps); - assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8); - assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000); - assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000); - assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000); - } - - @Test - public void testKbps() { - double kbps = 1231434.12; - assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000); - assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8); - assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps); - assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000); - assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000); - } - - @Test - public void testMbps() { - double mbps = 1231434.12; - assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000); - assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8); - assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000); - assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps); - assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000); - } - - @Test - public void testGbps() { - double gbps = 1231434.12; - assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000 ); - assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 * 1000 / 8); - assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000); - assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000); - assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps); - } - - @Test - public void testByte() { - double bytes = 1231434.12; - assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8); - assertEquals(BitRateUnit.Byte.toByte(bytes), bytes); - assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8); - assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 * 8); - assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 / 1000 * 8); - } - - - @Test - public void testConvert() { - double unit = 12334125.1234; - assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toBit(unit)); - assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toBit(unit)); - assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toBit(unit)); - assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toBit(unit)); - assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toBit(unit)); - - assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toKilobit(unit)); - assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toKilobit(unit)); - assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toKilobit(unit)); - assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toKilobit(unit)); - assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toKilobit(unit)); - - assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toMegabit(unit)); - assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toMegabit(unit)); - assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toMegabit(unit)); - assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toMegabit(unit)); - assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toMegabit(unit)); - - assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toGigabit(unit)); - assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toGigabit(unit)); - assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toGigabit(unit)); - assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toGigabit(unit)); - assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toGigabit(unit)); - - assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toByte(unit)); - assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toByte(unit)); - assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toByte(unit)); - assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toByte(unit)); - assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toByte(unit)); - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 7dd28df8a6466..c75f68129f59b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -85,7 +85,6 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.BrokerInterceptors; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; -import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; @@ -662,14 +661,6 @@ public void start() throws PulsarServerException { + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true."); } - if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent() - && config.isLoadBalancerEnabled() - && LinuxInfoUtils.isLinux() - && !LinuxInfoUtils.checkHasNicSpeeds()) { - throw new IllegalStateException("Unable to read VM NIC speed. You must set " - + "[loadBalancerOverrideBrokerNicSpeedGbps] to override it when load balancer is enabled."); - } - localMetadataStore = createLocalMetadataStore(); localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java deleted file mode 100644 index 2a423045b1bc4..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.loadbalance; - -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.SystemUtils; -import org.apache.pulsar.broker.BitRateUnit; - -@Slf4j -public class LinuxInfoUtils { - - // CGROUP - private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage"; - private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; - private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; - - // proc states - private static final String PROC_STAT_PATH = "/proc/stat"; - private static final String NIC_PATH = "/sys/class/net/"; - // NIC type - private static final int ARPHRD_ETHER = 1; - private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed"; - - private static Object /*jdk.internal.platform.Metrics*/ metrics; - private static Method getMetricsProviderMethod; - private static Method getCpuQuotaMethod; - private static Method getCpuPeriodMethod; - private static Method getCpuUsageMethod; - - static { - try { - metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics") - .invoke(null); - if (metrics != null) { - getMetricsProviderMethod = metrics.getClass().getMethod("getProvider"); - getMetricsProviderMethod.setAccessible(true); - getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota"); - getCpuQuotaMethod.setAccessible(true); - getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod"); - getCpuPeriodMethod.setAccessible(true); - getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage"); - getCpuUsageMethod.setAccessible(true); - } - } catch (Throwable e) { - log.warn("Failed to get runtime metrics", e); - } - } - - /** - * Determine whether the OS is the linux kernel. - * @return Whether the OS is the linux kernel - */ - public static boolean isLinux() { - return SystemUtils.IS_OS_LINUX; - } - - /** - * Determine whether the OS enable CG Group. - */ - public static boolean isCGroupEnabled() { - try { - if (metrics == null) { - return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH)); - } - String provider = (String) getMetricsProviderMethod.invoke(metrics); - log.info("[LinuxInfo] The system metrics provider is: {}", provider); - return provider.contains("cgroup"); - } catch (Exception e) { - log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage()); - return false; - } - } - - /** - * Get total cpu limit. - * @param isCGroupsEnabled Whether CGroup is enabled - * @return Total cpu limit - */ - public static double getTotalCpuLimit(boolean isCGroupsEnabled) { - if (isCGroupsEnabled) { - try { - long quota; - long period; - if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) { - quota = (long) getCpuQuotaMethod.invoke(metrics); - period = (long) getCpuPeriodMethod.invoke(metrics); - } else { - quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH)); - period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH)); - } - - if (quota > 0) { - return 100.0 * quota / period; - } - } catch (Exception e) { - log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e); - // Fallback to availableProcessors - } - } - // Fallback to JVM reported CPU quota - return 100 * Runtime.getRuntime().availableProcessors(); - } - - /** - * Get CGroup cpu usage. - * @return Cpu usage - */ - public static long getCpuUsageForCGroup() { - try { - if (metrics != null && getCpuUsageMethod != null) { - return (long) getCpuUsageMethod.invoke(metrics); - } - return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH)); - } catch (Exception e) { - log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e); - return -1; - } - } - - - /** - * Reads first line of /proc/stat to get total cpu usage. - * - *
-     *     cpu  user   nice system idle    iowait irq softirq steal guest guest_nice
-     *     cpu  317808 128  58637  2503692 7634   0   13472   0     0     0
-     * 
- *

- * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this - * far. Real CPU usage should equal the sum subtracting the idle cycles, this would include iowait, irq and steal. - */ - public static ResourceUsage getCpuUsageForEntireHost() { - try (Stream stream = Files.lines(Paths.get(PROC_STAT_PATH))) { - Optional first = stream.findFirst(); - if (!first.isPresent()) { - log.error("[LinuxInfo] Failed to read CPU usage from /proc/stat, because of empty values."); - return ResourceUsage.empty(); - } - String[] words = first.get().split("\\s+"); - long total = Arrays.stream(words) - .filter(s -> !s.contains("cpu")) - .mapToLong(Long::parseLong) - .sum(); - long idle = Long.parseLong(words[4]); - return ResourceUsage.builder() - .usage(total - idle) - .idle(idle) - .total(total).build(); - } catch (IOException e) { - log.error("[LinuxInfo] Failed to read CPU usage from /proc/stat", e); - return ResourceUsage.empty(); - } - } - - /** - * Determine whether the VM has physical nic. - * @param nicPath Nic path - * @return whether The VM has physical nic. - */ - private static boolean isPhysicalNic(Path nicPath) { - try { - if (nicPath.toString().contains("/virtual/")) { - return false; - } - // Check the type to make sure it's ethernet (type "1") - String type = readTrimStringFromFile(nicPath.resolve("type")); - // wireless NICs don't report speed, ignore them. - return Integer.parseInt(type) == ARPHRD_ETHER; - } catch (Exception e) { - log.warn("[LinuxInfo] Failed to read {} NIC type, the detail is: {}", nicPath, e.getMessage()); - // Read type got error. - return false; - } - } - - /** - * Determine whether nic is usable. - * @param nicPath Nic path - * @return whether nic is usable. - */ - private static boolean isUsable(Path nicPath) { - try { - String operstate = readTrimStringFromFile(nicPath.resolve("operstate")); - Operstate operState = Operstate.valueOf(operstate.toUpperCase(Locale.ROOT)); - switch (operState) { - case UP: - case UNKNOWN: - case DORMANT: - return true; - default: - return false; - } - } catch (Exception e) { - log.warn("[LinuxInfo] Failed to read {} NIC operstate, the detail is: {}", nicPath, e.getMessage()); - // Read operstate got error. - return false; - } - } - - /** - * Get all physical nic limit. - * @param nics All nic path - * @param bitRateUnit Bit rate unit - * @return Total nic limit - */ - public static double getTotalNicLimit(List nics, BitRateUnit bitRateUnit) { - return bitRateUnit.convert(nics.stream().mapToDouble(nicPath -> { - try { - return readDoubleFromFile(getReplacedNICPath(NIC_SPEED_TEMPLATE, nicPath)); - } catch (IOException e) { - log.error("[LinuxInfo] Failed to get total nic limit.", e); - return 0d; - } - }).sum(), BitRateUnit.Megabit); - } - - /** - * Get all physical nic usage. - * @param nics All nic path - * @param type Nic's usage type: transport, receive - * @param bitRateUnit Bit rate unit - * @return Total nic usage - */ - public static double getTotalNicUsage(List nics, NICUsageType type, BitRateUnit bitRateUnit) { - return bitRateUnit.convert(nics.stream().mapToDouble(nic -> { - try { - return readDoubleFromFile(getReplacedNICPath(type.template, nic)); - } catch (IOException e) { - log.error("[LinuxInfo] Failed to read {} bytes for NIC {} ", type, nic, e); - return 0d; - } - }).sum(), BitRateUnit.Byte); - } - - /** - * Get paths of all usable physical nic. - * @return All usable physical nic paths. - */ - public static List getUsablePhysicalNICs() { - try (Stream stream = Files.list(Paths.get(NIC_PATH))) { - return stream.filter(LinuxInfoUtils::isPhysicalNic) - .filter(LinuxInfoUtils::isUsable) - .map(path -> path.getFileName().toString()) - .collect(Collectors.toList()); - } catch (IOException e) { - log.error("[LinuxInfo] Failed to find NICs", e); - return Collections.emptyList(); - } - } - - /** - * Check this VM has nic speed. - * @return Whether the VM has nic speed - */ - public static boolean checkHasNicSpeeds() { - List physicalNICs = getUsablePhysicalNICs(); - if (CollectionUtils.isEmpty(physicalNICs)) { - return false; - } - double totalNicLimit = getTotalNicLimit(physicalNICs, BitRateUnit.Kilobit); - return totalNicLimit > 0; - } - - private static Path getReplacedNICPath(String template, String nic) { - return Paths.get(String.format(template, nic)); - } - - private static String readTrimStringFromFile(Path path) throws IOException { - return new String(Files.readAllBytes(path), StandardCharsets.UTF_8).trim(); - } - - private static long readLongFromFile(Path path) throws IOException { - return Long.parseLong(readTrimStringFromFile(path)); - } - - private static double readDoubleFromFile(Path path) throws IOException { - return Double.parseDouble(readTrimStringFromFile(path)); - } - - /** - * TLV IFLA_OPERSTATE - * contains RFC2863 state of the interface in numeric representation: - * See ... - */ - enum Operstate { - // Interface is in unknown state, neither driver nor userspace has set - // operational state. Interface must be considered for user data as - // setting operational state has not been implemented in every driver. - UNKNOWN, - // Interface is unable to transfer data on L1, f.e. ethernet is not - // plugged or interface is ADMIN down. - DOWN, - // Interfaces stacked on an interface that is IF_OPER_DOWN show this - // state (f.e. VLAN). - LOWERLAYERDOWN, - // Interface is L1 up, but waiting for an external event, f.e. for a - // protocol to establish. (802.1X) - DORMANT, - // Interface is operational up and can be used. - UP - } - - @VisibleForTesting - public static Object getMetrics() { - return metrics; - } - - @AllArgsConstructor - public enum NICUsageType { - // transport - TX("/sys/class/net/%s/statistics/tx_bytes"), - // receive - RX("/sys/class/net/%s/statistics/rx_bytes"); - private final String template; - } - - @Data - @Builder - public static class ResourceUsage { - private final long total; - private final long idle; - private final long usage; - - public static ResourceUsage empty() { - return ResourceUsage.builder() - .total(-1) - .idle(-1) - .usage(-1).build(); - } - - public boolean isEmpty() { - return this.total == -1 && idle == -1 && usage == -1; - } - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java index 9c9a00d7f5dcf..fc6c4116e0cbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -18,27 +18,26 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalCpuLimit; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicLimit; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicUsage; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getUsablePhysicalNICs; -import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.isCGroupEnabled; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.sun.management.OperatingSystemMXBean; +import java.io.IOException; import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BitRateUnit; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; -import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -55,9 +54,14 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { private double lastCpuTotalTime; private OperatingSystemMXBean systemBean; private SystemResourceUsage usage; + private final Optional overrideBrokerNicSpeedGbps; private final boolean isCGroupsEnabled; + private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage"; + private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; + private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; + public LinuxBrokerHostUsageImpl(PulsarService pulsar) { this( pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(), @@ -73,7 +77,15 @@ public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin, this.lastCollection = 0L; this.usage = new SystemResourceUsage(); this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps; - this.isCGroupsEnabled = isCGroupEnabled(); + + boolean isCGroupsEnabled = false; + try { + isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH)); + } catch (Exception e) { + log.warn("Failed to check cgroup CPU usage file: {}", e.getMessage()); + } + this.isCGroupsEnabled = isCGroupsEnabled; + // Call now to initialize values before the constructor returns calculateBrokerHostUsage(); executorService.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::calculateBrokerHostUsage), @@ -88,17 +100,19 @@ public SystemResourceUsage getBrokerHostUsage() { @Override public void calculateBrokerHostUsage() { - List nics = getUsablePhysicalNICs(); - double totalNicLimit = getTotalNicLimitWithConfiguration(nics); - double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, BitRateUnit.Kilobit); - double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, BitRateUnit.Kilobit); - double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled); + List nics = getNics(); + double totalNicLimit = getTotalNicLimitKbps(nics); + double totalNicUsageTx = getTotalNicUsageTxKb(nics); + double totalNicUsageRx = getTotalNicUsageRxKb(nics); + double totalCpuLimit = getTotalCpuLimit(); + long now = System.currentTimeMillis(); double elapsedSeconds = (now - lastCollection) / 1000d; if (elapsedSeconds <= 0) { log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", elapsedSeconds); return; } + SystemResourceUsage usage = new SystemResourceUsage(); double cpuUsage = getTotalCpuUsage(elapsedSeconds); @@ -114,21 +128,30 @@ public void calculateBrokerHostUsage() { usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit)); usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit)); } - usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); lastTotalNicUsageTx = totalNicUsageTx; lastTotalNicUsageRx = totalNicUsageRx; lastCollection = System.currentTimeMillis(); this.usage = usage; + usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); } - @VisibleForTesting - double getTotalNicLimitWithConfiguration(List nics) { - // Use the override value as configured. Return the total max speed across all available NICs, converted - // from Gbps into Kbps - return overrideBrokerNicSpeedGbps.map(BitRateUnit.Gigabit::toKilobit) - .map(speed -> speed * nics.size()) - .orElseGet(() -> getTotalNicLimit(nics, BitRateUnit.Kilobit)); + private double getTotalCpuLimit() { + if (isCGroupsEnabled) { + try { + long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH); + long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH); + if (quota > 0) { + return 100.0 * quota / period; + } + } catch (IOException e) { + log.warn("Failed to read CPU quotas from cgroups", e); + // Fallback to availableProcessors + } + } + + // Fallback to JVM reported CPU quota + return 100 * Runtime.getRuntime().availableProcessors(); } private double getTotalCpuUsage(double elapsedTimeSeconds) { @@ -139,13 +162,6 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) { } } - private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { - double usage = (double) getCpuUsageForCGroup(); - double currentUsage = usage - lastCpuUsage; - lastCpuUsage = usage; - return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1); - } - /** * Reads first line of /proc/stat to get total cpu usage. * @@ -155,18 +171,39 @@ private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { * * * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this - * far. Real CPU usage should equal the sum subtracting the idle cycles, this would include iowait, irq and steal. + * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal. */ private double getTotalCpuUsageForEntireHost() { - LinuxInfoUtils.ResourceUsage cpuUsageForEntireHost = getCpuUsageForEntireHost(); - if (cpuUsageForEntireHost.isEmpty()) { + try (Stream stream = Files.lines(Paths.get("/proc/stat"))) { + String[] words = stream.findFirst().get().split("\\s+"); + + long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum(); + long idle = Long.parseLong(words[4]); + long usage = total - idle; + + double currentUsage = (usage - lastCpuUsage) / (total - lastCpuTotalTime) * getTotalCpuLimit(); + + lastCpuUsage = usage; + lastCpuTotalTime = total; + + return currentUsage; + } catch (IOException e) { + log.error("Failed to read CPU usage from /proc/stat", e); + return -1; + } + } + + private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { + try { + long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH); + double currentUsage = usage - lastCpuUsage; + lastCpuUsage = usage; + + return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1); + } catch (IOException e) { + log.error("Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e); return -1; } - double currentUsage = (cpuUsageForEntireHost.getUsage() - lastCpuUsage) - / (cpuUsageForEntireHost.getTotal() - lastCpuTotalTime) * getTotalCpuLimit(isCGroupsEnabled); - lastCpuUsage = cpuUsageForEntireHost.getUsage(); - lastCpuTotalTime = cpuUsageForEntireHost.getTotal(); - return currentUsage; } private ResourceUsage getMemUsage() { @@ -175,4 +212,86 @@ private ResourceUsage getMemUsage() { return new ResourceUsage(total - free, total); } + private List getNics() { + try (Stream stream = Files.list(Paths.get("/sys/class/net/"))) { + return stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + } catch (IOException e) { + log.error("Failed to find NICs", e); + return Collections.emptyList(); + } + } + + public int getNicCount() { + return getNics().size(); + } + + private boolean isPhysicalNic(Path path) { + try { + if (path.toString().contains("/virtual/")) { + return false; + } + // Check the type to make sure it's ethernet (type "1") + String type = new String(Files.readAllBytes(path.resolve("type")), StandardCharsets.UTF_8).trim(); + // wireless NICs don't report speed, ignore them. + return Integer.parseInt(type) == 1; + } catch (Exception e) { + // Read type got error. + return false; + } + } + + private Path getNicSpeedPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/speed", nic)); + } + + private double getTotalNicLimitKbps(List nics) { + // Use the override value as configured. Return the total max speed across all available NICs, converted + // from Gbps into Kbps + return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size() * 1000 * 1000) + .orElseGet(() -> nics.stream().mapToDouble(nicPath -> { + // Nic speed is in Mbits/s, return kbits/s + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(nicPath)))); + } catch (IOException e) { + log.error(String.format("Failed to read speed for nic %s, maybe you can set broker" + + " config [loadBalancerOverrideBrokerNicSpeedGbps] to override it.", nicPath), e); + return 0d; + } + }).sum() * 1000); + } + + private Path getNicTxPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic)); + } + + private Path getNicRxPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic)); + } + + private double getTotalNicUsageRxKb(List nics) { + return nics.stream().mapToDouble(s -> { + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s)))); + } catch (IOException e) { + log.error("Failed to read rx_bytes for NIC " + s, e); + return 0d; + } + }).sum() * 8d / 1000; + } + + private double getTotalNicUsageTxKb(List nics) { + return nics.stream().mapToDouble(s -> { + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s)))); + } catch (IOException e) { + log.error("Failed to read tx_bytes for NIC " + s, e); + return 0d; + } + }).sum() * 8d / 1000; + } + + private static long readLongFromFile(String path) throws IOException { + return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), Charsets.UTF_8).trim()); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java index 512dc594f29fb..b9a017504e004 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java @@ -101,7 +101,6 @@ private ServiceConfiguration getConf() { conf.setDefaultNumberOfNamespaceBundles(1); conf.setMetadataStoreUrl(metadataStoreUrl); conf.setBrokerShutdownTimeoutMs(0L); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); conf.setNumExecutorThreadPoolSize(1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index c905f87bce181..ddb1ed4d46960 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -68,7 +68,6 @@ public void testGetWorkerService() throws Exception { configuration.setClusterName("clusterName"); configuration.setFunctionsWorkerEnabled(true); configuration.setBrokerShutdownTimeoutMs(0L); - configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); WorkerService expectedWorkerService = mock(WorkerService.class); @Cleanup PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 0156b845ad4cf..36784372f6b46 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -78,7 +78,6 @@ void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerShutdownTimeoutMs(0L); config.setClusterName("my-cluster"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index 19c174d0edf4a..ce2acf22aecb3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -114,7 +114,6 @@ public void testPersistentList() throws Exception { /***** Start Broker 2 ******/ ServiceConfiguration conf = new ServiceConfiguration(); conf.setBrokerShutdownTimeoutMs(0L); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index d88a411be34cf..b61b18ffb7b8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -181,7 +181,6 @@ protected final void internalSetupForStatsTest() throws Exception { protected void doInitConf() throws Exception { this.conf.setBrokerShutdownTimeoutMs(0L); - this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); this.conf.setBrokerServicePort(Optional.of(0)); this.conf.setBrokerServicePortTls(Optional.of(0)); this.conf.setAdvertisedAddress("localhost"); @@ -479,7 +478,6 @@ protected static ServiceConfiguration getDefaultConf() { configuration.setConfigurationStoreServers("localhost:3181"); configuration.setAllowAutoTopicCreationType("non-partitioned"); configuration.setBrokerShutdownTimeoutMs(0L); - configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); configuration.setBrokerServicePort(Optional.of(0)); configuration.setBrokerServicePortTls(Optional.of(0)); configuration.setWebServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index eb23d6197fc5a..9e81a3e1db968 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -111,7 +111,6 @@ void setup() throws Exception { config1.setWebServicePort(Optional.of(0)); config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); - config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setFailureDomainsEnabled(true); config1.setLoadBalancerEnabled(true); @@ -133,7 +132,6 @@ void setup() throws Exception { config2.setWebServicePort(Optional.of(0)); config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); - config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setFailureDomainsEnabled(true); config2.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 3d49dd9df9036..cb01ba43e8676 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -69,7 +69,6 @@ public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerExcepti final String clusterName = "elect-test"; ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); config.setClusterName(clusterName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index b7d6a391fb02a..a64f28384d3fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -125,7 +125,6 @@ void setup() throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress(localhost+i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java index a1c52e13c69cb..50bb6b37760d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -29,21 +30,17 @@ @Test(groups = "broker") public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest { - int usableNicCount; - - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); - conf.setLoadBalancerEnabled(true); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(5.4)); - } + int nicCount; @BeforeClass @Override public void setup() throws Exception { + conf.setLoadBalancerEnabled(true); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(5.4)); super.internalSetup(); + if (SystemUtils.IS_OS_LINUX) { - usableNicCount = LinuxInfoUtils.getUsablePhysicalNICs().size(); + nicCount = new LinuxBrokerHostUsageImpl(pulsar).getNicCount(); } } @@ -60,12 +57,12 @@ public void checkLoadReportNicSpeed() throws Exception { LoadManagerReport report = admin.brokerStats().getLoadReport(); if (SystemUtils.IS_OS_LINUX) { - assertEquals(report.getBandwidthIn().limit, usableNicCount * 5.4 * 1000 * 1000, 0.0001); - assertEquals(report.getBandwidthOut().limit, usableNicCount * 5.4 * 1000 * 1000, 0.0001); + assertEquals(report.getBandwidthIn().limit, nicCount * 5.4 * 1000 * 1000); + assertEquals(report.getBandwidthOut().limit, nicCount * 5.4 * 1000 * 1000); } else { // On non-Linux system we don't report the network usage - assertEquals(report.getBandwidthIn().limit, -1.0, 0.0001); - assertEquals(report.getBandwidthOut().limit, -1.0, 0.0001); + assertEquals(report.getBandwidthIn().limit, -1.0); + assertEquals(report.getBandwidthOut().limit, -1.0); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java deleted file mode 100644 index 80ab649000075..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.loadbalance; - -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Optional; -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; -import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; -import org.testng.Assert; -import org.testng.annotations.Test; - -@Slf4j -@Test(groups = "broker") -public class SimpleBrokerStartTest { - - public void testHasNICSpeed() throws Exception { - if (!LinuxInfoUtils.isLinux()) { - return; - } - // Start local bookkeeper ensemble - @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); - bkEnsemble.start(); - // Start broker - ServiceConfiguration config = new ServiceConfiguration(); - config.setClusterName("use"); - config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); - config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - config.setBrokerServicePort(Optional.of(0)); - config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); - config.setBrokerServicePortTls(Optional.of(0)); - config.setWebServicePortTls(Optional.of(0)); - config.setAdvertisedAddress("localhost"); - boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds(); - if (hasNicSpeeds) { - @Cleanup - PulsarService pulsarService = new PulsarService(config); - pulsarService.start(); - } - } - - public void testNoNICSpeed() throws Exception { - if (!LinuxInfoUtils.isLinux()) { - return; - } - // Start local bookkeeper ensemble - @Cleanup("stop") - LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); - bkEnsemble.start(); - // Start broker - ServiceConfiguration config = new ServiceConfiguration(); - config.setClusterName("use"); - config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); - config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - config.setBrokerServicePort(Optional.of(0)); - config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); - config.setBrokerServicePortTls(Optional.of(0)); - config.setWebServicePortTls(Optional.of(0)); - config.setAdvertisedAddress("localhost"); - boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds(); - if (!hasNicSpeeds) { - @Cleanup - PulsarService pulsarService = new PulsarService(config); - try { - pulsarService.start(); - fail("unexpected behaviour"); - } catch (PulsarServerException ex) { - assertTrue(ex.getCause() instanceof IllegalStateException); - } - } - } - - - @Test - public void testCGroupMetrics() { - if (!LinuxInfoUtils.isLinux()) { - return; - } - - boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup")); - boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled(); - Assert.assertEquals(cGroupEnabled, existsCGroup); - - double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled); - log.info("totalCpuLimit: {}", totalCpuLimit); - Assert.assertTrue(totalCpuLimit > 0.0); - - if (cGroupEnabled) { - Assert.assertNotNull(LinuxInfoUtils.getMetrics()); - - long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup(); - log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup); - Assert.assertTrue(cpuUsageForCGroup > 0); - } - } - -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index f71aa9a522520..ceff0d8aaed47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -111,7 +111,6 @@ void setup() throws Exception { config1.setWebServicePort(Optional.of(0)); config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); - config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config1.setBrokerServicePortTls(Optional.of(0)); @@ -131,7 +130,6 @@ void setup() throws Exception { config2.setWebServicePort(Optional.of(0)); config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); - config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config2.setBrokerServicePortTls(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 3b9c839775aa5..9ff266ba96ce0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -64,7 +64,6 @@ void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java deleted file mode 100644 index b6a625a4fe19e..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.loadbalance.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; -import org.testng.Assert; -import org.testng.annotations.Test; - -@Slf4j -public class LinuxBrokerHostUsageImplTest { - - @Test - public void checkOverrideBrokerNicSpeedGbps() { - @Cleanup("shutdown") - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - LinuxBrokerHostUsageImpl linuxBrokerHostUsage = - new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), executorService); - List nics = new ArrayList<>(); - nics.add("1"); - nics.add("2"); - nics.add("3"); - double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics); - Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3); - } - - @Test - public void testCpuUsage() throws InterruptedException { - if (!LinuxInfoUtils.isLinux()) { - return; - } - - @Cleanup("shutdown") - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - LinuxBrokerHostUsageImpl linuxBrokerHostUsage = - new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService); - - linuxBrokerHostUsage.calculateBrokerHostUsage(); - TimeUnit.SECONDS.sleep(1); - linuxBrokerHostUsage.calculateBrokerHostUsage(); - - double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage; - double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit; - float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage(); - - Assert.assertTrue(usage > 0); - Assert.assertTrue(limit > 0); - Assert.assertTrue(limit >= usage); - Assert.assertTrue(percentUsage > 0); - - log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index dae160b1421f0..be87320e6482b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -150,7 +150,6 @@ void setup() throws Exception { config1.setAdvertisedAddress("localhost"); config1.setBrokerShutdownTimeoutMs(0L); - config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setBrokerServicePortTls(Optional.of(0)); config1.setWebServicePortTls(Optional.of(0)); @@ -170,7 +169,6 @@ void setup() throws Exception { config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setAdvertisedAddress("localhost"); config2.setBrokerShutdownTimeoutMs(0L); - config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setBrokerServicePortTls(Optional.of(0)); config2.setWebServicePortTls(Optional.of(0)); @@ -624,7 +622,6 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); PulsarService pulsar = new PulsarService(config); // create znode using different zk-session diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java index c24164acbbf99..c25c6c23fb011 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java @@ -109,7 +109,6 @@ protected void startBroker() throws Exception { conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setBrokerShutdownTimeoutMs(0L); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index abe58585d4473..1d2878ca64471 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -101,7 +101,6 @@ void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setClusterName("usc"); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index de7d2bba2d43e..b5aaafa6e21bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -81,7 +81,6 @@ protected void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setClusterName("usc"); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index a5d8e62c3bb82..6a9ec94de8424 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -145,11 +145,8 @@ public void testBookieIsolation() throws Exception { config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config.setClusterName(cluster); config.setWebServicePort(Optional.of(0)); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); @@ -311,7 +308,6 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); @@ -453,7 +449,6 @@ public void testDeleteIsolationGroup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index 480894b51f88b..f6a90778c6ffe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -64,7 +64,6 @@ void setup() { configuration.setWebServicePort(Optional.of(0)); configuration.setClusterName("max_message_test"); configuration.setBrokerShutdownTimeoutMs(0L); - configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); configuration.setBrokerServicePort(Optional.of(0)); configuration.setAuthorizationEnabled(false); configuration.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 5ec0694ab2236..c53d921be1133 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -45,7 +45,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -120,7 +119,6 @@ public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build(); ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index d81b3041003d2..1ced87a5a2a6f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -30,7 +30,6 @@ import java.lang.reflect.Method; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -80,7 +79,6 @@ public void setup(Method m) throws Exception { super.setUp(m); ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @Cleanup PulsarService pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 0e7a26f0cf954..98be79a932ad0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -178,7 +178,6 @@ public void setup() throws Exception { ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setAdvertisedAddress("localhost"); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setMaxUnackedMessagesPerConsumer(50000); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 852ea66597796..4270ec448b6d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -270,7 +270,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setTlsCertificateFilePath(brokerCertFilePath); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index eb1cc05434c2f..c461dd102d079 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -193,7 +193,6 @@ public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).build(); svcConfig = new ServiceConfiguration(); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS)); svcConfig.setBacklogQuotaCheckEnabled(false); svcConfig.setClusterName("use"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index ccdfb7aa77cb4..8c68c2de91a4f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -92,7 +92,6 @@ void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 2d1e72d45c80e..0e2d300f7a802 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -112,7 +112,6 @@ public void setup() throws Exception { ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setTransactionCoordinatorEnabled(true); svcConfig.setClusterName("pulsar-cluster"); pulsarMock = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index e8da743a29b78..25c555f09b949 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -170,7 +170,6 @@ protected void startBroker() throws Exception { conf.setBookkeeperClientExposeStatsToPrometheus(true); conf.setForceDeleteNamespaceAllowed(true); conf.setBrokerShutdownTimeoutMs(0L); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index e017c28693a67..aba5b044583df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -66,7 +66,6 @@ protected final void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 4a2fe5b69f4a4..0cd72f19a4db9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -397,7 +397,6 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU ServiceConfiguration config = new ServiceConfiguration(); config.setAdvertisedAddress("localhost"); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); if (enableTls) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 71f3ae73ffcba..ce4c326954018 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -155,7 +155,6 @@ public void testMultipleBrokerLookup() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -272,7 +271,6 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -366,7 +364,6 @@ public void testPartitionTopicLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -443,7 +440,6 @@ public void testWebserviceServiceTls() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); conf2.setBrokerServicePort(Optional.of(0)); @@ -560,7 +556,6 @@ public void testSplitUnloadLookupTest() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -665,7 +660,6 @@ public void testModularLoadManagerSplitBundle() throws Exception { // (1) Start broker-1 ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); - conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); conf2.setBrokerServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index faf747f36edc2..c8325908c6c0b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -86,7 +86,6 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setTlsAllowInsecureConnection(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 46ea5bf04529d..6548dbe26ef3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -944,7 +944,6 @@ void setupReplicationCluster() throws Exception { config1.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config1.setBrokerShutdownTimeoutMs(0L); - config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config1.setAllowAutoTopicCreationType("non-partitioned"); @@ -971,7 +970,6 @@ void setupReplicationCluster() throws Exception { config2.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config2.setBrokerShutdownTimeoutMs(0L); - config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config2.setAllowAutoTopicCreationType("non-partitioned"); @@ -998,7 +996,6 @@ void setupReplicationCluster() throws Exception { config3.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config3.setBrokerShutdownTimeoutMs(0L); - config3.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config3.setBrokerServicePort(Optional.of(0)); config3.setAllowAutoTopicCreationType("non-partitioned"); pulsar3 = new PulsarService(config3); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java index 80408910141a9..e6d050f798573 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -107,7 +107,6 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception PulsarService pulsarService1 = pulsar; conf.setBrokerShutdownTimeoutMs(0L); - conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); restartBroker(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 455021651803f..173794a2cfb58 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -130,7 +130,6 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 3f019a39c08b6..7029dc222dbd5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -197,7 +197,6 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 5d830a67778a4..d985241e2903d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -127,7 +127,6 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 043c2ecccd649..f4a27506c2e10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -90,7 +90,6 @@ void setup() throws Exception { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setWebServicePort(Optional.empty()); config.setWebServicePortTls(Optional.of(webPort)); config.setBrokerServicePort(Optional.empty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 9ba2ccde471df..a9fd06dc05809 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -94,7 +94,6 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 7c0f2f0bbe143..9e1edea2f8029 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -118,7 +118,6 @@ public void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index afe16c63ec283..b3126defa11d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -101,7 +101,6 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index dd097ce034c5f..9f6d9f65dbbd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -36,7 +36,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -108,7 +107,6 @@ void setup(Method method) throws Exception { config = spy(ServiceConfiguration.class); config.setBrokerShutdownTimeoutMs(0L); - config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 36ec308682300..2e6f403862bf4 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -176,7 +176,6 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) .withEnv("clusterName", clusterName) .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") - .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1") // used in s3 tests .withEnv("AWS_ACCESS_KEY_ID", "accesskey") .withEnv("AWS_SECRET_KEY", "secretkey")