diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index cc34c2a513..ce57978ca1 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -62,7 +62,6 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -903,8 +902,6 @@ class Shuffle extends ChannelInboundHandlerAdapter { private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; private final LoadingCache pathCache = CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, @@ -937,10 +934,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { String base = getBaseLocation(key.jobId, key.dagId, key.user); String attemptBase = base + key.attemptId; - Path indexFileName = lDirAlloc.getLocalPathToRead( - attemptBase + Path.SEPARATOR + INDEX_FILE_NAME, conf); - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - attemptBase + Path.SEPARATOR + DATA_FILE_NAME, conf); + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); + Path mapOutputFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); LOG.debug("Loaded : {} via loader", key); return new AttemptPathInfo(indexFileName, mapOutputFileName); @@ -1154,7 +1151,7 @@ private boolean deleteDagDirectories(Channel channel, String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); try { FileContext lfc = FileContext.getLocalFSFileContext(); - for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(base, conf)) { + for(Path dagPath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(base)) { lfc.delete(dagPath, true); } } catch (IOException e) { @@ -1196,7 +1193,7 @@ private boolean deleteTaskAttemptDirectories(Channel channel, List taskA String baseStr = getBaseLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); try { FileSystem fs = FileSystem.getLocal(conf).getRaw(); - for (Path basePath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { + for (Path basePath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) { for (FileStatus fileStatus : fs.listStatus(basePath)) { Path taskAttemptPath = fileStatus.getPath(); if (taskAttemptPath.getName().startsWith(taskAttemptId)) { @@ -1293,7 +1290,7 @@ private String getBaseLocation(String jobId, String dagId, String user) { private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException { String baseStr = getBaseLocation(jobId, dagId, user); FileContext lfc = FileContext.getLocalFSFileContext(); - for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { + for(Path dagPath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) { RemoteIterator status = lfc.listStatus(dagPath); final JobID jobID = JobID.forName(jobId); String taskDirPrefix = String.format("attempt%s_%s_%s_", diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index d0d0a381e7..d32998afcc 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -61,6 +62,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; @@ -114,6 +116,7 @@ public class TestShuffleHandler { private static final String TEST_PARTITION_DATA_STRING = "0123456789"; class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -156,8 +159,38 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } } + private static class TestAuxiliaryLocalPathHandler + implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return new Path(TEST_DIR.getAbsolutePath(), path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return new Path(TEST_DIR.getAbsolutePath()); + } + + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return new Path(TEST_DIR.getAbsolutePath()); + } + + @Override + public Iterable getAllLocalPathsForRead(String path) + throws IOException { + ArrayList paths = new ArrayList<>(); + paths.add(new Path(TEST_DIR.getAbsolutePath(), path)); + return paths; + } + } private static class MockShuffleHandler2 extends org.apache.tez.auxservices.ShuffleHandler { boolean socketKeepAlive = false; @@ -541,6 +574,11 @@ public void testSocketKeepAlive() throws Exception { conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); HttpURLConnection conn = null; MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + AuxiliaryLocalPathHandler pathHandler = + mock(AuxiliaryLocalPathHandler.class); + when(pathHandler.getLocalPathForRead(anyString())).thenThrow( + new IOException("Test")); + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); try { shuffleHandler.init(conf); shuffleHandler.start(); @@ -721,9 +759,7 @@ public void testRangedFetch() throws IOException { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", - TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); LOG.info(appId.toString()); String appAttemptId = "attempt_12345_1_m_1_0"; @@ -731,7 +767,7 @@ public void testRangedFetch() throws IOException { String reducerIdStart = "0"; String reducerIdEnd = "1"; List fileMap = new ArrayList<>(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); shuffleHandler.init(conf); @@ -786,7 +822,7 @@ public void testRangedFetch() throws IOException { } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -825,15 +861,14 @@ private void runMultiAttemptMultiRangeShuffleTest(int attemptRange, int reduceRa conf.setInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, 3); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); LOG.info(appId.toString()); List attemptIds = IntStream.range(firstAttempt, firstAttempt + attemptRange) .mapToObj(i -> "attempt_12345_1_m_" + i + "_0").collect(Collectors.toList()); List fileMap = new ArrayList<>(); for (String attemptId : attemptIds) { - createShuffleHandlerFiles(absLogDir, user, appId.toString(), attemptId, conf, fileMap, reducerIdStart, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), attemptId, conf, fileMap, reducerIdStart, reducerIdEnd); } ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); @@ -884,7 +919,7 @@ private void runMultiAttemptMultiRangeShuffleTest(int attemptRange, int reduceRa } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -903,16 +938,14 @@ public void testMapFileAccess() throws IOException { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", - TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); LOG.info(appId.toString()); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List fileMap = new ArrayList(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify(); shuffleHandler.init(conf); @@ -959,7 +992,7 @@ public void testMapFileAccess() throws IOException { Assert.assertTrue((new String(byteArr)).contains(message)); } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -1197,17 +1230,16 @@ public void testGetMapOutputInfo() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List fileMap = new ArrayList(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing @@ -1248,6 +1280,10 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; shuffleHandler.init(conf); try { @@ -1287,7 +1323,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, false, failureEncountered.get()); } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -1299,16 +1335,15 @@ public void testDagDelete() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; List fileMap = new ArrayList(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing @@ -1322,6 +1357,10 @@ protected void sendError(ChannelHandlerContext ctx, String message, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; shuffleHandler.init(conf); try { @@ -1349,7 +1388,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); String dagDirStr = StringUtils.join(Path.SEPARATOR, - new String[] { absLogDir.getAbsolutePath(), + new String[] { TEST_DIR.getAbsolutePath(), ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId.toString(),"dag_1/"}); File dagDir = new File(dagDirStr); @@ -1366,7 +1405,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, false, failureEncountered.get()); } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -1378,24 +1417,28 @@ public void testVertexShuffleDelete() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345L, 1); String appAttemptId = "attempt_12345_0001_1_00_000000_0_10003_0"; String user = "randomUser"; List fileMap = new ArrayList(); - String vertexDirStr = StringUtils.join(Path.SEPARATOR, new String[] { absLogDir.getAbsolutePath(), + String vertexDirStr = StringUtils.join(Path.SEPARATOR, new String[] { TEST_DIR.getAbsolutePath(), ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/" + appAttemptId}); File vertexDir = new File(vertexDirStr); Assert.assertFalse("vertex directory should not be present", vertexDir.exists()); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, + HttpResponse response, URL requestUri) throws IOException { + // Do nothing. + } @Override protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { @@ -1406,6 +1449,10 @@ protected void sendError(ChannelHandlerContext ctx, String message, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; shuffleHandler.init(conf); try { @@ -1442,7 +1489,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, } } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -1454,27 +1501,31 @@ public void testFailedTaskAttemptDelete() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; List fileMap = new ArrayList(); String taskAttemptDirStr = StringUtils.join(Path.SEPARATOR, - new String[] {absLogDir.getAbsolutePath(), + new String[] {TEST_DIR.getAbsolutePath(), ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/", appAttemptId}); File taskAttemptDir = new File(taskAttemptDirStr); Assert.assertFalse("Task Attempt Directory should not exist", taskAttemptDir.exists()); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, + HttpResponse response, URL requestUri) throws IOException { + // Do nothing. + } @Override protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { @@ -1485,6 +1536,10 @@ protected void sendError(ChannelHandlerContext ctx, String message, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; shuffleHandler.init(conf); try { @@ -1523,7 +1578,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, 0, failures.size()); } finally { shuffleHandler.close(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(TEST_DIR); } } @@ -1685,6 +1740,7 @@ private Configuration getInitialConf() { private ShuffleHandler getShuffleHandlerWithNoVerify() { return new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(Configuration conf) { @@ -1697,6 +1753,10 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpReques } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; } }