diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java index 65a7a3f20774..9634dd1eb309 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java @@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -40,7 +42,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; @@ -58,6 +59,7 @@ class RegionHDFSBlockLocationFinder extends Configured { private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class); private static final long CACHE_TIME = 240 * 60 * 1000; + private static final float EPSILON = 0.0001f; private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private volatile ClusterMetrics status; @@ -110,12 +112,70 @@ void setClusterInfoProvider(ClusterInfoProvider provider) { void setClusterMetrics(ClusterMetrics status) { long currentTime = EnvironmentEdgeManager.currentTime(); - this.status = status; + if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { + this.status = status; // Only count the refresh if it includes user tables ( eg more than meta and namespace ). lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh; + } else { + refreshLocalityChangedRegions(this.status, status); + this.status = status; + } + } + + /** + * If locality for a region has changed, that pretty certainly means our cache is out of date. + * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality. + */ + private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) { + if (oldStatus == null || newStatus == null) { + LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", + oldStatus, newStatus); + return; + } + + Map oldServers = oldStatus.getLiveServerMetrics(); + Map newServers = newStatus.getLiveServerMetrics(); + + Map regionsByName = new HashMap<>(cache.asMap().size()); + for (RegionInfo regionInfo : cache.asMap().keySet()) { + regionsByName.put(regionInfo.getEncodedName(), regionInfo); + } + + for (Map.Entry serverEntry : newServers.entrySet()) { + Map newRegions = serverEntry.getValue().getRegionMetrics(); + for (Map.Entry regionEntry : newRegions.entrySet()) { + String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey()); + RegionInfo region = regionsByName.get(encodedName); + if (region == null) { + continue; + } + + float newLocality = regionEntry.getValue().getDataLocality(); + float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers); + + if (Math.abs(newLocality - oldLocality) > EPSILON) { + LOG.debug("Locality for region {} changed from {} to {}, refreshing cache", + region.getEncodedName(), oldLocality, newLocality); + cache.refresh(region); + } + } + + } + } + + private float getOldLocality(ServerName newServer, byte[] regionName, + Map oldServers) { + ServerMetrics serverMetrics = oldServers.get(newServer); + if (serverMetrics == null) { + return -1f; + } + RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName); + if (regionMetrics == null) { + return -1f; } + return regionMetrics.getDataLocality(); } /** @@ -159,7 +219,7 @@ private HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { return blocksDistribution; } } catch (IOException ioe) { - LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " + + LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}", region.getEncodedName(), ioe); } @@ -263,7 +323,7 @@ void refreshAndWait(Collection hris) { } catch (InterruptedException ite) { Thread.currentThread().interrupt(); } catch (ExecutionException ee) { - LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " + + LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}", hregionInfo.getEncodedName(), ee); } index++; diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java index 8e129e347e2a..a011793c0cb4 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -31,12 +32,14 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; +import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -204,4 +207,59 @@ public void testGetTopBlockLocations() { } } } + + @Test + public void testRefreshRegionsWithChangedLocality() { + ServerName testServer = ServerName.valueOf("host-0", 12345, 12345); + RegionInfo testRegion = REGIONS.get(0); + + Map cache = new HashMap<>(); + for (RegionInfo region : REGIONS) { + HDFSBlocksDistribution hbd = finder.getBlockDistribution(region); + assertHostAndWeightEquals(generate(region), hbd); + cache.put(region, hbd); + } + + finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), + 0.123f)); + + // everything should be cached, because metrics were null before + for (RegionInfo region : REGIONS) { + HDFSBlocksDistribution hbd = finder.getBlockDistribution(region); + assertSame(cache.get(region), hbd); + } + + finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), + 0.345f)); + + // locality changed just for our test region, so it should no longer be the same + for (RegionInfo region : REGIONS) { + HDFSBlocksDistribution hbd = finder.getBlockDistribution(region); + if (region.equals(testRegion)) { + assertNotSame(cache.get(region), hbd); + } else { + assertSame(cache.get(region), hbd); + } + } + } + + private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region, + float locality) { + RegionMetrics regionMetrics = mock(RegionMetrics.class); + when(regionMetrics.getDataLocality()).thenReturn(locality); + + Map regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + regionMetricsMap.put(region, regionMetrics); + + ServerMetrics serverMetrics = mock(ServerMetrics.class); + when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap); + + Map serverMetricsMap = new HashMap<>(); + serverMetricsMap.put(serverName, serverMetrics); + + ClusterMetrics metrics = mock(ClusterMetrics.class); + when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + + return metrics; + } } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index ff3f8168fbc3..d14792e78bb9 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important. the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size + + hbase.locality.inputstream.derive.enabled + false + + If true, derive StoreFile locality metrics from the underlying DFSInputStream + backing reads for that StoreFile. This value will update as the DFSInputStream's + block locations are updated over time. Otherwise, locality is computed on StoreFile + open, and cached until the StoreFile is closed. + + + + hbase.locality.inputstream.derive.cache.period + 60000 + + If deriving StoreFile locality metrics from the underlying DFSInputStream, how + long should the derived values be cached for. The derivation process may involve + hitting the namenode, if the DFSInputStream's block list is incomplete. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index ba84606fd04f..ea285ed53fad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -126,6 +126,10 @@ public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int buf this.in = tryOpen(); } + private FSDataInputStream getUnderlyingInputStream() { + return in; + } + @Override public int read() throws IOException { int res; @@ -475,6 +479,17 @@ public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOExce return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize)); } + /** + * If the passed FSDataInputStream is backed by a FileLink, returns the underlying + * InputStream for the resolved link target. Otherwise, returns null. + */ + public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) { + if (stream.getWrappedStream() instanceof FileLinkInputStream) { + return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream(); + } + return null; + } + /** * NOTE: This method must be used only in the constructor! * It creates a List with the specified locations for the link. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 7a7468973af9..ecbc78f3d108 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -127,6 +128,7 @@ public class HStoreFile implements StoreFile { // StoreFile.Reader private volatile StoreFileReader initialReader; + private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null; // Block cache configuration and reference. private final CacheConfig cacheConf; @@ -344,7 +346,11 @@ public OptionalLong getBulkLoadTimestamp() { * file is opened. */ public HDFSBlocksDistribution getHDFSBlockDistribution() { - return this.fileInfo.getHDFSBlockDistribution(); + if (initialReaderBlockDistribution != null) { + return initialReaderBlockDistribution.getHDFSBlockDistribution(); + } else { + return this.fileInfo.getHDFSBlockDistribution(); + } } /** @@ -362,6 +368,13 @@ private void open() throws IOException { fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader()); } this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader); + + if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) { + boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum(); + FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum); + this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo); + } + // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java new file mode 100644 index 000000000000..aa15cda922d7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.io.FileLink; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Computes the HDFSBlockDistribution for a file based on the underlying located blocks + * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves + * allocating an array of numBlocks size per call. It may also involve calling the namenode, if + * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure, + * we cache the computed distribution for a configurable period of time. + *

+ * This class only gets instantiated for the first FSDataInputStream of each StoreFile (i.e. + * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the + * value returned by {@link HStoreFile#getHDFSBlockDistribution()}. + *

+ * Once the backing FSDataInputStream is closed, we should not expect the distribution result + * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the + * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution + * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will + * be created for the new initialReader. + */ +@InterfaceAudience.Private +public class InputStreamBlockDistribution { + private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class); + + private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = + "hbase.locality.inputstream.derive.enabled"; + private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false; + + private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = + "hbase.locality.inputstream.derive.cache.period"; + private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000; + + private final FSDataInputStream stream; + private final StoreFileInfo fileInfo; + private final int cachePeriodMs; + + private HDFSBlocksDistribution hdfsBlocksDistribution; + private long lastCachedAt; + private boolean streamUnsupported; + + /** + * This should only be called for the first FSDataInputStream of a StoreFile, + * in {@link HStoreFile#open()}. + * + * @see InputStreamBlockDistribution + * @param stream the input stream to derive locality from + * @param fileInfo the StoreFileInfo for the related store file + */ + public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) { + this.stream = stream; + this.fileInfo = fileInfo; + this.cachePeriodMs = fileInfo.getConf().getInt( + HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD, + DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD); + this.lastCachedAt = EnvironmentEdgeManager.currentTime(); + this.streamUnsupported = false; + this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution(); + } + + /** + * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream + */ + public static boolean isEnabled(Configuration conf) { + return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED, + DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED); + } + + /** + * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache + * is expired. + */ + public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() { + if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) { + try { + LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo); + computeBlockDistribution(); + } catch (IOException e) { + LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.", + fileInfo, e); + } + } + return hdfsBlocksDistribution; + } + + private void computeBlockDistribution() throws IOException { + lastCachedAt = EnvironmentEdgeManager.currentTime(); + + FSDataInputStream stream; + if (fileInfo.isLink()) { + stream = FileLink.getUnderlyingFileLinkInputStream(this.stream); + } else { + stream = this.stream; + } + + if (!(stream instanceof HdfsDataInputStream)) { + if (!streamUnsupported) { + LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be " + + "used to derive locality. Falling back on cached value.", + stream, fileInfo, fileInfo.isLink()); + streamUnsupported = true; + } + return; + } + + streamUnsupported = false; + hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream); + } + + /** + * For tests only, sets lastCachedAt so we can force a refresh + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + synchronized void setLastCachedAt(long timestamp) { + lastCachedAt = timestamp; + } + + /** + * For tests only, returns the configured cache period + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + long getCachePeriodMs() { + return cachePeriodMs; + } + + /** + * For tests only, returns whether the passed stream is supported + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + boolean isStreamUnsupported() { + return streamUnsupported; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 1156b1768bb1..461170de6a15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -81,6 +81,9 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Progressable; @@ -703,6 +706,38 @@ public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOExc return fs.exists(metaRegionDir); } + /** + * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams + * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode. + * This method retrieves those blocks from the input stream and uses them to calculate + * HDFSBlockDistribution. + * + * The underlying method in DFSInputStream does attempt to use locally cached blocks, but + * may hit the namenode if the cache is determined to be incomplete. The method also involves + * making copies of all LocatedBlocks rather than return the underlying blocks themselves. + */ + static public HDFSBlocksDistribution computeHDFSBlocksDistribution( + HdfsDataInputStream inputStream) throws IOException { + List blocks = inputStream.getAllBlocks(); + HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); + for (LocatedBlock block : blocks) { + String[] hosts = getHostsForLocations(block); + long len = block.getBlockSize(); + StorageType[] storageTypes = block.getStorageTypes(); + blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); + } + return blocksDistribution; + } + + private static String[] getHostsForLocations(LocatedBlock block) { + DatanodeInfo[] locations = block.getLocations(); + String[] hosts = new String[locations.length]; + for (int i = 0; i < hosts.length; i++) { + hosts[i] = locations[i].getHostName(); + } + return hosts; + } + /** * Compute HDFS blocks distribution of a given file, or a portion of the file * @param fs file system diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java index 1836960d0ea2..6a85f9864288 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.ipc.RemoteException; import org.junit.ClassRule; import org.junit.Test; @@ -88,6 +89,40 @@ public void testHashCode() { assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering } + /** + * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped + * to a {@link HdfsDataInputStream} by + * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)} + */ + @Test + public void testGetUnderlyingFSDataInputStream() throws Exception { + HBaseTestingUtil testUtil = new HBaseTestingUtil(); + Configuration conf = testUtil.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + testUtil.startMiniDFSCluster(1); + try { + MiniDFSCluster cluster = testUtil.getDFSCluster(); + FileSystem fs = cluster.getFileSystem(); + + Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); + + writeSomeData(fs, originalPath, 256 << 20, (byte) 2); + + List files = new ArrayList(); + files.add(originalPath); + + FileLink link = new FileLink(files); + FSDataInputStream stream = link.open(fs); + + FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream); + assertTrue(underlying instanceof HdfsDataInputStream); + } finally { + testUtil.shutdownMiniCluster(); + } + } + /** * Test, on HDFS, that the FileLink is still readable * even when the current file gets renamed. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java new file mode 100644 index 000000000000..2c7872ad89d0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.io.FileLink; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class}) +public class TestInputStreamBlockDistribution { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class); + + private Configuration conf; + private FileSystem fs; + private Path testPath; + + @Before + public void setUp() throws Exception { + HBaseTestingUtil testUtil = new HBaseTestingUtil(); + conf = testUtil.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); + + testUtil.startMiniDFSCluster(1); + MiniDFSCluster cluster = testUtil.getDFSCluster(); + fs = cluster.getFileSystem(); + + testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); + + writeSomeData(fs, testPath, 256 << 20, (byte)2); + } + + @After + public void tearDown() throws Exception { + fs.delete(testPath, false); + fs.close(); + } + + @Test + public void itDerivesLocalityFromHFileInputStream() throws Exception { + try (FSDataInputStream stream = fs.open(testPath)) { + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + InputStreamBlockDistribution test = + new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + assertNotSame(initial, test.getHDFSBlockDistribution()); + } + + } + + @Test + public void itDerivesLocalityFromFileLinkInputStream() throws Exception { + List files = new ArrayList(); + files.add(testPath); + + FileLink link = new FileLink(files); + try (FSDataInputStream stream = link.open(fs)) { + + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream, + getMockedStoreFileInfo(initial, true)); + + assertSame(initial, test.getHDFSBlockDistribution()); + + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + assertNotSame(initial, test.getHDFSBlockDistribution()); + } + } + + @Test + public void itFallsBackOnLastKnownValueWhenUnsupported() { + FSDataInputStream fakeStream = mock(FSDataInputStream.class); + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream, + getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve + assertSame(initial, test.getHDFSBlockDistribution()); + assertTrue(test.isStreamUnsupported()); + } + + @Test + public void itFallsBackOnLastKnownValueOnException() throws IOException { + HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class); + when(fakeStream.getAllBlocks()).thenThrow(new IOException("test")); + + HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); + + InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream, + getMockedStoreFileInfo(initial, false)); + + assertSame(initial, test.getHDFSBlockDistribution()); + test.setLastCachedAt(test.getCachePeriodMs() + 1); + + // fakeStream throws an exception, so falls back on original + assertSame(initial, test.getHDFSBlockDistribution()); + + assertFalse(test.isStreamUnsupported()); + } + + /** + * Write up to 'size' bytes with value 'v' into a new file called 'path'. + */ + private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException { + byte[] data = new byte[4096]; + for (int i = 0; i < data.length; i++) { + data[i] = v; + } + + FSDataOutputStream stream = fs.create(path); + try { + long written = 0; + while (written < size) { + stream.write(data, 0, data.length); + written += data.length; + } + } finally { + stream.close(); + } + } + + private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution, + boolean isFileLink) { + StoreFileInfo mock = mock(StoreFileInfo.class); + when(mock.getHDFSBlockDistribution()) + .thenReturn(distribution); + when(mock.getConf()).thenReturn(conf); + when(mock.isLink()).thenReturn(isFileLink); + return mock; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 16d4456fc18f..f4a4df53895d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -105,7 +106,31 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) out.close(); } - @Test public void testcomputeHDFSBlocksDistribution() throws Exception { + @Test + public void testComputeHDFSBlocksDistributionByInputStream() throws Exception { + testComputeHDFSBlocksDistribution((fs, testFile) -> { + try (FSDataInputStream open = fs.open(testFile)) { + assertTrue(open instanceof HdfsDataInputStream); + return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open); + } + }); + } + + @Test + public void testComputeHDFSBlockDistribution() throws Exception { + testComputeHDFSBlocksDistribution((fs, testFile) -> { + FileStatus status = fs.getFileStatus(testFile); + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + }); + } + + @FunctionalInterface + interface HDFSBlockDistributionFunction { + HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException; + } + + private void testComputeHDFSBlocksDistribution( + HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception { final int DEFAULT_BLOCK_SIZE = 1024; conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); MiniDFSCluster cluster = null; @@ -129,9 +154,10 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) boolean ok; do { ok = true; - FileStatus status = fs.getFileStatus(testFile); + HDFSBlocksDistribution blocksDistribution = - FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + fileToBlockDistribution.getForPath(fs, testFile); + long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); for (String host : hosts) { @@ -163,9 +189,8 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) long weight; long uniqueBlocksTotalWeight; do { - FileStatus status = fs.getFileStatus(testFile); HDFSBlocksDistribution blocksDistribution = - FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + fileToBlockDistribution.getForPath(fs, testFile); uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); String tophost = blocksDistribution.getTopHosts().get(0); @@ -197,8 +222,7 @@ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; HDFSBlocksDistribution blocksDistribution; do { - FileStatus status = fs.getFileStatus(testFile); - blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 } while (blocksDistribution.getTopHosts().size() != 3 &&