Skip to content

Commit

Permalink
[improve][broker] Make BrokerSelectionStrategy pluggable (apache#22553)
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored Apr 23, 2024
1 parent 358c7cc commit 89b201e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
Expand All @@ -104,7 +105,7 @@
import org.slf4j.Logger;

@Slf4j
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerSelectionStrategyFactory {

public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(
TopicDomain.non_persistent.value(),
Expand Down Expand Up @@ -252,6 +253,11 @@ public Set<NamespaceBundle> getOwnedServiceUnits() {
return ownedServiceUnits;
}

@Override
public BrokerSelectionStrategy createBrokerSelectionStrategy() {
return new LeastResourceUsageWithWeight();
}

public enum Role {
Leader,
Follower
Expand All @@ -267,8 +273,7 @@ public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline.add(new BrokerLoadManagerClassFilter());
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
this.brokerSelectionStrategy = createBrokerSelectionStrategy();
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* The broker selection strategy is designed to select the broker according to different implementations.
*/
@InterfaceStability.Evolving
public interface BrokerSelectionStrategy {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import org.apache.pulsar.common.classification.InterfaceStability;

@InterfaceStability.Stable
public interface BrokerSelectionStrategyFactory {

BrokerSelectionStrategy createBrokerSelectionStrategy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import java.util.Comparator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Cleanup;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class CustomBrokerSelectionStrategyTest extends MultiBrokerBaseTest {

@Override
protected void startBroker() throws Exception {
addCustomConfigs(conf);
super.startBroker();
}

@Override
protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
return addCustomConfigs(getDefaultConf());
}

private static ServiceConfiguration addCustomConfigs(ServiceConfiguration conf) {
conf.setLoadManagerClassName(CustomExtensibleLoadManager.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerAutoBundleSplitEnabled(false);
conf.setDefaultNumberOfNamespaceBundles(8);
// Don't consider broker's load so the broker will be selected randomly with the default strategy
conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(100);
return conf;
}

@Test
public void testSingleBrokerSelected() throws Exception {
final var topic = "test-single-broker-selected";
getAllAdmins().get(0).topics().createPartitionedTopic(topic, 16);
@Cleanup final var producer = (PartitionedProducerImpl<byte[]>) getAllClients().get(0).newProducer()
.topic(topic).create();
Assert.assertNotNull(producer);
final var connections = producer.getProducers().stream().map(ProducerImpl::getClientCnx)
.collect(Collectors.toSet());
Assert.assertEquals(connections.size(), 1);
final var port = Integer.parseInt(connections.stream().findFirst().orElseThrow().ctx().channel()
.remoteAddress().toString().replaceAll(".*:", ""));
final var expectedPort = Stream.concat(Stream.of(pulsar), additionalBrokers.stream())
.min(Comparator.comparingInt(o -> o.getListenPortHTTP().orElseThrow()))
.map(PulsarService::getBrokerListenPort)
.orElseThrow().orElseThrow();
Assert.assertEquals(port, expectedPort);
}

public static class CustomExtensibleLoadManager extends ExtensibleLoadManagerImpl {

@Override
public BrokerSelectionStrategy createBrokerSelectionStrategy() {
// The smallest HTTP port will always be selected because the host parts are all "localhost"
return (brokers, __, ___) -> brokers.stream().sorted().findFirst();
}
}
}

0 comments on commit 89b201e

Please sign in to comment.