Skip to content

Commit

Permalink
HBASE-26309 Balancer tends to move regions to the server at the end o…
Browse files Browse the repository at this point in the history
…f list (apache#3723)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
clarax authored Nov 2, 2021
1 parent b9b7fec commit 72a8846
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public String getRack(ServerName server) {
serversPerHostList.get(hostIndex).add(serverIndex);

String rack = this.rackManager.getRack(sn);

if (!racksToIndex.containsKey(rack)) {
racksToIndex.put(rack, numRacks++);
serversPerRackList.add(new ArrayList<>());
Expand All @@ -187,6 +188,7 @@ public String getRack(ServerName server) {
serversPerRackList.get(rackIndex).add(serverIndex);
}

LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
// Count how many regions there are.
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
Expand Down Expand Up @@ -285,6 +287,7 @@ public String getRack(ServerName server) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), i);
}
if (serversPerHost[i].length > 1) {
multiServersPerHost = true;
Expand All @@ -295,6 +298,7 @@ public String getRack(ServerName server) {
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
for (int j = 0; j < serversPerRack[i].length; j++) {
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), i);
}
}

Expand Down Expand Up @@ -792,6 +796,10 @@ boolean contains(int[] arr, int val) {

private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);

public Comparator<Integer> getNumRegionsComparator() {
return numRegionsComparator;
}

int getLowestLocalityRegionOnServer(int serverIndex) {
if (regionFinder != null) {
float lowestLocality = 1.0f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.master.balancer;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand All @@ -34,27 +35,53 @@ BalanceAction generate(BalancerClusterState cluster) {
private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;

int index = 0;
while (servers[index] == null || servers[index] == thisServer) {
index++;
if (index == servers.length) {
return -1;
int selectedIndex = -1;
double currentLargestRandom = -1;
for (int i = 0; i < servers.length; i++) {
if (servers[i] == null || servers[i] == thisServer) {
continue;
}
if (selectedIndex != -1
&& cluster.getNumRegionsComparator().compare(servers[i], servers[selectedIndex]) != 0) {
// Exhausted servers of the same region count
break;
}
// we don't know how many servers have the same region count, we will randomly select one
// using a simplified inline reservoir sampling by assignmening a random number to stream
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
double currentRandom = ThreadLocalRandom.current().nextDouble();
if (currentRandom > currentLargestRandom) {
selectedIndex = i;
currentLargestRandom = currentRandom;
}
}
return servers[index];
return selectedIndex == -1 ? -1 : servers[selectedIndex];
}

private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) {
Integer[] servers = cluster.serverIndicesSortedByRegionCount;

int index = servers.length - 1;
while (servers[index] == null || servers[index] == thisServer) {
index--;
if (index < 0) {
return -1;
int selectedIndex = -1;
double currentLargestRandom = -1;
for (int i = servers.length - 1; i >= 0; i--) {
if (servers[i] == null || servers[i] == thisServer) {
continue;
}
if (selectedIndex != -1 && cluster.getNumRegionsComparator().compare(servers[i],
servers[selectedIndex]) != 0) {
// Exhausted servers of the same region count
break;
}
// we don't know how many servers have the same region count, we will randomly select one
// using a simplified inline reservoir sampling by assignmening a random number to stream
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
double currentRandom = ThreadLocalRandom.current().nextDouble();
if (currentRandom > currentLargestRandom) {
selectedIndex = i;
currentLargestRandom = currentRandom;
}
}
return servers[index];
return selectedIndex == -1? -1 : servers[selectedIndex];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
}

if (idleRegionServerExist(cluster)){
LOG.info("Running balancer because at least one server hosts replicas of the same region." +
"regionReplicaRackCostFunction={}", regionReplicaRackCostFunction.cost());
LOG.info("Running balancer because cluster has idle server(s)."+
" function cost={}", functionCost());
return true;
Expand Down Expand Up @@ -510,9 +508,9 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
LOG.info("Finished computing new moving plan. Computation took {} ms" +
" to try {} different iterations. Found a solution that moves " +
"{} regions; Going from a computed imbalance of {}" +
" to a new imbalance of {}. ",
" to a new imbalance of {}. funtionCost={}",
endTime - startTime, step, plans.size(),
initCost / sumMultiplier, currentCost / sumMultiplier);
initCost / sumMultiplier, currentCost / sumMultiplier, functionCost());
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
return plans;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);

// Print out the cluster loads to make debugging easier.
LOG.info("Mock Balance : " + printMock(balancedCluster));
LOG.info("Mock after Balance : " + printMock(balancedCluster));

if (assertFullyBalanced) {
assertClusterAsBalanced(balancedCluster);
Expand All @@ -95,4 +95,40 @@ protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
}
}
}

protected void testWithClusterWithIteration(Map<ServerName, List<RegionInfo>> serverMap,
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
List<ServerAndLoad> list = convertToList(serverMap);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));

loadBalancer.setRackManager(rackManager);
// Run the balancer.
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNotNull("Initial cluster balance should produce plans.", plans);

List<ServerAndLoad> balancedCluster = null;
// Run through iteration until done. Otherwise will be killed as test time out
while (plans != null && (assertFullyBalanced || assertFullyBalancedForReplicas)) {
// Apply the plan to the mock cluster.
balancedCluster = reconcile(list, plans, serverMap);

// Print out the cluster loads to make debugging easier.
LOG.info("Mock after balance: " + printMock(balancedCluster));

LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
plans = loadBalancer.balanceCluster(LoadOfAllTable);
}

// Print out the cluster loads to make debugging easier.
LOG.info("Mock Final balance: " + printMock(balancedCluster));

if (assertFullyBalanced) {
assertNull("Given a requirement to be fully balanced, second attempt at plans should " +
"produce none.", plans);
}
if (assertFullyBalancedForReplicas) {
assertRegionReplicaPlacement(serverMap, rackManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -37,25 +38,36 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
HBaseClassTestRule.forClass(TestStochasticLoadBalancerRegionReplicaWithRacks.class);

private static class ForTestRackManager extends RackManager {

int numRacks;
Map<String, Integer> serverIndexes = new HashMap<String, Integer>();
int numServers = 0;

public ForTestRackManager(int numRacks) {
this.numRacks = numRacks;
}


@Override
public String getRack(ServerName server) {
return "rack_" + (server.hashCode() % numRacks);
String key = server.getServerName();
if (!serverIndexes.containsKey(key)) {
serverIndexes.put(key, numServers++);
}
return "rack_" + serverIndexes.get(key) % numRacks;
}
}

@Test
public void testRegionReplicationOnMidClusterWithRacks() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
// for full balance
// conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.001f);
loadBalancer.onConfigurationChange(conf);
int numNodes = 4;
int numNodes = 5;
int numRegions = numNodes * 1;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 1;
Expand All @@ -65,6 +77,26 @@ public void testRegionReplicationOnMidClusterWithRacks() {
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);

testWithCluster(serverMap, rm, false, true);
testWithClusterWithIteration(serverMap, rm, true, true);
}

@Test
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
int numRegions = numNodes * 30;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 28;
int numTables = 1;
int numRacks = 4; // all replicas should be on a different rack
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);

testWithClusterWithIteration(serverMap, rm, true, true);
}
}

0 comments on commit 72a8846

Please sign in to comment.