Skip to content

Commit

Permalink
HBASE-26337 Optimization for weighted random generators (apache#3732)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
clarax authored Nov 9, 2021
1 parent 601467f commit 62cd2b6
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList
}

protected abstract double getCostFromRl(BalancerRegionLoad rl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ protected void regionMoved(int region, int oldServer, int newServer) {

protected abstract double cost();

/**
* Add the cost of this cost function to the weight of the candidate generator that is optimized
* for this cost function. By default it is the RandomCandiateGenerator for a cost function.
* Called once per init or after postAction.
* @param weights the weights for every generator.
*/
public void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
}

/**
* Scale the value between 0 and 1.
* @param min Min value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ protected List<CandidateGenerator> createCandidateGenerators() {
return fnPickers;
}

/**
* @return any candidate generator in random
*/
@Override
protected CandidateGenerator getRandomGenerator() {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
}

/**
* Round robin assignment: Segregate the regions into two types:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ private double getWeightedLocality(int region, int entity) {
return cluster.getOrComputeWeightedLocality(region, entity, type);
}

}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ protected void regionMoved(int region, int oldServer, int newServer) {
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ protected double cost() {
return scale(0, maxCost, totalCost);
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
}

/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.025f;

private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
// To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
private float sumMultiplier;
// to save and report costs to JMX
private double curOverallCost = 0d;
private double[] tempFunctionCosts;
private double[] curFunctionCosts;
private double[] weightsOfGenerators;

// Keep locality based picker and cost function to alert them
// when new services are offered
Expand All @@ -146,6 +146,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;

protected List<CandidateGenerator> candidateGenerators;

public enum GeneratorType {
RANDOM, LOAD, LOCALITY, RACK
}

/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
* default MetricsBalancer
Expand Down Expand Up @@ -204,10 +210,11 @@ protected float getDefaultSlop() {

protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
candidateGenerators.add(new RandomCandidateGenerator());
candidateGenerators.add(new LoadCandidateGenerator());
candidateGenerators.add(localityCandidateGenerator);
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
candidateGenerators.add(GeneratorType.RACK.ordinal(),
new RegionReplicaRackCandidateGenerator());
return candidateGenerators;
}

Expand Down Expand Up @@ -380,8 +387,33 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
BalanceAction nextAction(BalancerClusterState cluster) {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
.generate(cluster);
return getRandomGenerator().generate(cluster);
}

/**
* Select the candidate generator to use based on the cost of cost functions. The chance of
* selecting a candidate generator is propotional to the share of cost of all cost functions among
* all cost functions that benefit from it.
*/
protected CandidateGenerator getRandomGenerator() {
double sum = 0;
for (int i = 0; i < weightsOfGenerators.length; i++) {
sum += weightsOfGenerators[i];
weightsOfGenerators[i] = sum;
}
if (sum == 0) {
return candidateGenerators.get(0);
}
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] /= sum;
}
double rand = ThreadLocalRandom.current().nextDouble();
for (int i = 0; i < weightsOfGenerators.length; i++) {
if (rand <= weightsOfGenerators[i]) {
return candidateGenerators.get(i);
}
}
return candidateGenerators.get(candidateGenerators.size() - 1);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down Expand Up @@ -474,7 +506,7 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
}

cluster.doAction(action);
updateCostsWithAction(cluster, action);
updateCostsAndWeightsWithAction(cluster, action);

newCost = computeCost(cluster, currentCost);

Expand All @@ -490,7 +522,7 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
// TODO: undo by remembering old values
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
updateCostsWithAction(cluster, undoAction);
updateCostsAndWeightsWithAction(cluster, undoAction);
}

if (EnvironmentEdgeManager.currentTime() - startTime >
Expand Down Expand Up @@ -685,17 +717,28 @@ private void updateRegionLoad() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
for (CostFunction c : costFunctions) {
c.prepare(cluster);
c.updateWeight(weightsOfGenerators);
}
}

/**
* Update both the costs of costfunctions and the weights of candidate generators
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
}
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
c.postAction(action);
c.updateWeight(weightsOfGenerators);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class StochasticBalancerTestBase extends BalancerTestBase {

protected static StochasticLoadBalancer loadBalancer;

protected static DummyMetricsStochasticBalancer dummyMetricsStochasticBalancer = new DummyMetricsStochasticBalancer();
protected static DummyMetricsStochasticBalancer dummyMetricsStochasticBalancer = new
DummyMetricsStochasticBalancer();

@BeforeClass
public static void beforeAllTests() throws Exception {
Expand All @@ -58,7 +59,17 @@ protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerSe
boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
testWithCluster(serverMap, null, assertFullyBalanced,
assertFullyBalancedForReplicas);
}

protected void testWithClusterWithIteration(int numNodes, int numRegions, int numRegionsPerServer,
int replication, int numTables, boolean assertFullyBalanced,
boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithClusterWithIteration(serverMap, null, assertFullyBalanced,
assertFullyBalancedForReplicas);
}

protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
Expand Down Expand Up @@ -102,7 +113,8 @@ protected void testWithClusterWithIteration(Map<ServerName, List<RegionInfo>> se

loadBalancer.setRackManager(rackManager);
// Run the balancer.
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNotNull("Initial cluster balance should produce plans.", plans);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,10 @@ public void testCostAfterUndoAction() {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
BalanceAction action = loadBalancer.nextAction(cluster);
cluster.doAction(action);
loadBalancer.updateCostsWithAction(cluster, action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
loadBalancer.updateCostsWithAction(cluster, undoAction);
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
assertEquals(expectedCost, actualCost, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ public static void beforeAllTests() throws IOException {
RULES_FILE = HTU.getDataTestDir(DEFAULT_RULES_FILE_NAME).toString();
conf.set(HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
RULES_FILE);
loadBalancer = new StochasticLoadBalancer();
loadBalancer = new StochasticLoadTestBalancer();
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
loadBalancer.initialize();
loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
}

@Test
Expand Down Expand Up @@ -302,4 +301,14 @@ BalanceAction generate(BalancerClusterState cluster) {
return super.generate(cluster);
}
}

static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
private FairRandomCandidateGenerator fairRandomCandidateGenerator =
new FairRandomCandidateGenerator();

@Override
protected CandidateGenerator getRandomGenerator() {
return fairRandomCandidateGenerator;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testLargeCluster() {
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}

0 comments on commit 62cd2b6

Please sign in to comment.