Skip to content

Commit

Permalink
HBASE-26304 Reflect out of band locality improvements in metrics and …
Browse files Browse the repository at this point in the history
…balancer (apache#3803)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
bbeaudreault authored Nov 27, 2021
1 parent 33287ac commit 1b27124
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -40,7 +42,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
Expand All @@ -58,6 +59,7 @@
class RegionHDFSBlockLocationFinder extends Configured {
private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
private static final float EPSILON = 0.0001f;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
new HDFSBlocksDistribution();
private volatile ClusterMetrics status;
Expand Down Expand Up @@ -110,12 +112,70 @@ void setClusterInfoProvider(ClusterInfoProvider provider) {

void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status;

if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
this.status = status;
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
} else {
refreshLocalityChangedRegions(this.status, status);
this.status = status;
}
}

/**
* If locality for a region has changed, that pretty certainly means our cache is out of date.
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
*/
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
if (oldStatus == null || newStatus == null) {
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}",
oldStatus, newStatus);
return;
}

Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();

Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
for (RegionInfo regionInfo : cache.asMap().keySet()) {
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
}

for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
RegionInfo region = regionsByName.get(encodedName);
if (region == null) {
continue;
}

float newLocality = regionEntry.getValue().getDataLocality();
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);

if (Math.abs(newLocality - oldLocality) > EPSILON) {
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
region.getEncodedName(), oldLocality, newLocality);
cache.refresh(region);
}
}

}
}

private float getOldLocality(ServerName newServer, byte[] regionName,
Map<ServerName, ServerMetrics> oldServers) {
ServerMetrics serverMetrics = oldServers.get(newServer);
if (serverMetrics == null) {
return -1f;
}
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
if (regionMetrics == null) {
return -1f;
}

return regionMetrics.getDataLocality();
}

/**
Expand Down Expand Up @@ -159,7 +219,7 @@ private HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
return blocksDistribution;
}
} catch (IOException ioe) {
LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " +
LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
region.getEncodedName(), ioe);
}

Expand Down Expand Up @@ -263,7 +323,7 @@ void refreshAndWait(Collection<RegionInfo> hris) {
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " +
LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
hregionInfo.getEncodedName(), ee);
}
index++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
Expand All @@ -31,12 +32,14 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -204,4 +207,59 @@ public void testGetTopBlockLocations() {
}
}
}

@Test
public void testRefreshRegionsWithChangedLocality() {
ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
RegionInfo testRegion = REGIONS.get(0);

Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertHostAndWeightEquals(generate(region), hbd);
cache.put(region, hbd);
}

finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
0.123f));

// everything should be cached, because metrics were null before
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertSame(cache.get(region), hbd);
}

finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
0.345f));

// locality changed just for our test region, so it should no longer be the same
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
if (region.equals(testRegion)) {
assertNotSame(cache.get(region), hbd);
} else {
assertSame(cache.get(region), hbd);
}
}
}

private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region,
float locality) {
RegionMetrics regionMetrics = mock(RegionMetrics.class);
when(regionMetrics.getDataLocality()).thenReturn(locality);

Map<byte[], RegionMetrics> regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
regionMetricsMap.put(region, regionMetrics);

ServerMetrics serverMetrics = mock(ServerMetrics.class);
when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);

Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
serverMetricsMap.put(serverName, serverMetrics);

ClusterMetrics metrics = mock(ClusterMetrics.class);
when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);

return metrics;
}
}
19 changes: 19 additions & 0 deletions hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.enabled</name>
<value>false</value>
<description>
If true, derive StoreFile locality metrics from the underlying DFSInputStream
backing reads for that StoreFile. This value will update as the DFSInputStream's
block locations are updated over time. Otherwise, locality is computed on StoreFile
open, and cached until the StoreFile is closed.
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.cache.period</name>
<value>60000</value>
<description>
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
long should the derived values be cached for. The derivation process may involve
hitting the namenode, if the DFSInputStream's block list is incomplete.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int buf
this.in = tryOpen();
}

private FSDataInputStream getUnderlyingInputStream() {
return in;
}

@Override
public int read() throws IOException {
int res;
Expand Down Expand Up @@ -475,6 +479,17 @@ public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOExce
return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
}

/**
* If the passed FSDataInputStream is backed by a FileLink, returns the underlying
* InputStream for the resolved link target. Otherwise, returns null.
*/
public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
if (stream.getWrappedStream() instanceof FileLinkInputStream) {
return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
}
return null;
}

/**
* NOTE: This method must be used only in the constructor!
* It creates a List with the specified locations for the link.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class HStoreFile implements StoreFile {

// StoreFile.Reader
private volatile StoreFileReader initialReader;
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;

// Block cache configuration and reference.
private final CacheConfig cacheConf;
Expand Down Expand Up @@ -344,7 +346,11 @@ public OptionalLong getBulkLoadTimestamp() {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.fileInfo.getHDFSBlockDistribution();
if (initialReaderBlockDistribution != null) {
return initialReaderBlockDistribution.getHDFSBlockDistribution();
} else {
return this.fileInfo.getHDFSBlockDistribution();
}
}

/**
Expand All @@ -362,6 +368,13 @@ private void open() throws IOException {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);

if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
}

// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());

Expand Down
Loading

0 comments on commit 1b27124

Please sign in to comment.