diff --git a/LICENSE-binary b/LICENSE-binary index e2f61dc7cd84c..3a0e19c5824de 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -339,20 +339,20 @@ org.apache.solr:solr-solrj:8.11.2 org.apache.yetus:audience-annotations:0.5.0 org.apache.zookeeper:zookeeper:3.7.2 org.codehaus.jettison:jettison:1.5.4 -org.eclipse.jetty:jetty-annotations:9.4.51.v20230217 -org.eclipse.jetty:jetty-http:9.4.51.v20230217 -org.eclipse.jetty:jetty-io:9.4.51.v20230217 -org.eclipse.jetty:jetty-jndi:9.4.51.v20230217 -org.eclipse.jetty:jetty-plus:9.4.51.v20230217 -org.eclipse.jetty:jetty-security:9.4.51.v20230217 -org.eclipse.jetty:jetty-server:9.4.51.v20230217 -org.eclipse.jetty:jetty-servlet:9.4.51.v20230217 -org.eclipse.jetty:jetty-util:9.4.51.v20230217 -org.eclipse.jetty:jetty-util-ajax:9.4.51.v20230217 -org.eclipse.jetty:jetty-webapp:9.4.51.v20230217 -org.eclipse.jetty:jetty-xml:9.4.51.v20230217 -org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.51.v20230217 -org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.51.v20230217 +org.eclipse.jetty:jetty-annotations:9.4.53.v20231009 +org.eclipse.jetty:jetty-http:9.4.53.v20231009 +org.eclipse.jetty:jetty-io:9.4.53.v20231009 +org.eclipse.jetty:jetty-jndi:9.4.53.v20231009 +org.eclipse.jetty:jetty-plus:9.4.53.v20231009 +org.eclipse.jetty:jetty-security:9.4.53.v20231009 +org.eclipse.jetty:jetty-server:9.4.53.v20231009 +org.eclipse.jetty:jetty-servlet:9.4.53.v20231009 +org.eclipse.jetty:jetty-util:9.4.53.v20231009 +org.eclipse.jetty:jetty-util-ajax:9.4.53.v20231009 +org.eclipse.jetty:jetty-webapp:9.4.53.v20231009 +org.eclipse.jetty:jetty-xml:9.4.53.v20231009 +org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.53.v20231009 +org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.53.v20231009 org.ehcache:ehcache:3.3.1 org.ini4j:ini4j:0.5.4 org.lz4:lz4-java:1.7.1 diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 487d7b9409159..91f720a49eed9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -660,6 +660,7 @@ public Void run() throws Exception { private void doGracefulFailover() throws ServiceFailedException, IOException, InterruptedException { int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2; + Preconditions.checkArgument(timeout >= 0, "timeout should be non-negative."); // Phase 1: pre-flight checks checkEligibleForFailover(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java index 107333c5a63c3..3a3898727fc10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java @@ -257,7 +257,7 @@ public void testOfflineImageViewer() throws Exception { FSImageTestUtil.getFSImage( cluster.getNameNode()).getStorage().getStorageDir(0)); assertNotNull("Didn't generate or can't find fsimage", originalFsimage); - PrintStream o = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM); + PrintStream o = new PrintStream(NullOutputStream.INSTANCE); PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), o); v.visit(new RandomAccessFile(originalFsimage, "r")); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java index c24c9132cbcd5..b2112a74e8553 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java @@ -405,7 +405,7 @@ private static FileStatus pathToFileEntry(FileSystem hdfs, String file) @Test(expected = IOException.class) public void testTruncatedFSImage() throws IOException { File truncatedFile = new File(tempDir, "truncatedFsImage"); - PrintStream output = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM); + PrintStream output = new PrintStream(NullOutputStream.INSTANCE); copyPartOfFile(originalFsimage, truncatedFile); try (RandomAccessFile r = new RandomAccessFile(truncatedFile, "r")) { new FileDistributionCalculator(new Configuration(), 0, 0, false, output) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 06d9fbbe7a323..4f86f912838fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -955,7 +955,10 @@ public static class MapOutputBuffer new ArrayList(); private int totalIndexCacheMemory; private int indexCacheMemoryLimit; + private int spillFilesCountLimit; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; + private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1; + private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; private MapTask mapTask; private MapOutputFile mapOutputFile; @@ -984,10 +987,17 @@ public void init(MapOutputCollector.Context context MRJobConfig.DEFAULT_IO_SORT_MB); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); + spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT, + SPILL_FILES_COUNT_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + && spillFilesCountLimit < 0) { + throw new IOException("Invalid value for \"" + JobContext.SPILL_FILES_COUNT_LIMIT + "\", " + + "current value: " + spillFilesCountLimit); + } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); @@ -1698,7 +1708,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); - ++numSpills; + incrementNumSpills(); } finally { if (out != null) out.close(); if (partitionOut != null) { @@ -1774,7 +1784,7 @@ private void spillSingleRecord(final K key, final V value, totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - ++numSpills; + incrementNumSpills(); } finally { if (out != null) out.close(); if (partitionOut != null) { @@ -2022,7 +2032,7 @@ private void sameVolRename(Path srcPath, if (!dst.getParentFile().exists()) { if (!dst.getParentFile().mkdirs()) { throw new IOException("Unable to rename " + src + " to " - + dst + ": couldn't create parent directory"); + + dst + ": couldn't create parent directory"); } } @@ -2030,6 +2040,21 @@ private void sameVolRename(Path srcPath, throw new IOException("Unable to rename " + src + " to " + dst); } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "mapreduce.task.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } // MapOutputBuffer /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 8ec984e777bb6..289159ad922a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -323,6 +323,7 @@ public interface MRJobConfig { public static final int DEFAULT_IO_SORT_MB = 100; public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; + String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit"; public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9b0d8b563d7bd..ca144a7b156cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -62,6 +62,15 @@ set to less than .5 + + mapreduce.task.spill.files.count.limit + -1 + Number of spill files that can be created by a MapTask. + After breaching this, task will fail. Default value for this config is -1 + which indicates that there is no limit on number of spill files being + created + + mapreduce.job.local-fs.single-disk-limit.bytes -1 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java index d5164de46d12a..fef179994f09a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java @@ -27,14 +27,20 @@ import org.apache.hadoop.mapred.MapTask.MapOutputBuffer; import org.apache.hadoop.mapred.Task.TaskReporter; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.util.Progress; import org.junit.After; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -51,6 +57,9 @@ public void cleanup() throws Exception { FileUtil.fullyDelete(TEST_ROOT_DIR); } + @Rule + public ExpectedException exception = ExpectedException.none(); + // Verify output files for shuffle have group read permission even when // the configured umask normally would prevent it. @Test @@ -84,4 +93,73 @@ public void testShufflePermissions() throws Exception { Assert.assertEquals("Incorrect index file perms", (short)0640, perms.toShort()); } + + @Test + public void testSpillFilesCountLimitInvalidValue() throws Exception { + JobConf conf = new JobConf(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath()); + conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2); + MapOutputFile mof = new MROutputFiles(); + mof.setConf(conf); + TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1); + MapTask mockTask = mock(MapTask.class); + doReturn(mof).when(mockTask).getMapOutputFile(); + doReturn(attemptId).when(mockTask).getTaskID(); + doReturn(new Progress()).when(mockTask).getSortPhase(); + TaskReporter mockReporter = mock(TaskReporter.class); + doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class)); + MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter); + MapOutputBuffer mob = new MapOutputBuffer<>(); + + exception.expect(IOException.class); + exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " + + "current value: -2"); + + mob.init(ctx); + mob.close(); + } + + @Test + public void testSpillFilesCountBreach() throws Exception { + JobConf conf = new JobConf(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath()); + conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2); + MapOutputFile mof = new MROutputFiles(); + mof.setConf(conf); + TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1); + MapTask mockTask = mock(MapTask.class); + doReturn(mof).when(mockTask).getMapOutputFile(); + doReturn(attemptId).when(mockTask).getTaskID(); + doReturn(new Progress()).when(mockTask).getSortPhase(); + TaskReporter mockReporter = mock(TaskReporter.class); + doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class)); + MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter); + MapOutputBuffer mob = new MapOutputBuffer<>(); + mob.numSpills = 2; + mob.init(ctx); + + Method method = mob.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(mob); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } + + mob.close(); + + Assert.assertTrue(gotExceptionWithMessage); + } } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 25e48f293a64a..5b1c569d21de6 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -37,7 +37,7 @@ true true - 9.4.51.v20230217 + 9.4.53.v20231009 _ _ diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java index f745a2e519e3c..9b28ca406d693 100644 --- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java @@ -54,6 +54,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -505,7 +506,10 @@ void generateScript(File localScript) throws IOException { String classpath = halrJarPath + File.pathSeparator + harJarPath; FileWriterWithEncoding fw = null; try { - fw = new FileWriterWithEncoding(localScript, "UTF-8"); + fw = FileWriterWithEncoding.builder() + .setFile(localScript) + .setCharset(StandardCharsets.UTF_8) + .get(); fw.write("#!/bin/bash\nset -e\nset -x\n"); int containerCount = 1; for (AppInfo context : eligibleApplications) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 477a8a293cebf..26c3e01a45d03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -32,6 +32,7 @@ import java.io.PrintStream; import java.io.Writer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -783,7 +784,10 @@ public static void readAcontainerLogs(DataInputStream valueStream, OutputStream os = null; PrintStream ps = null; try { - os = new WriterOutputStream(writer, Charset.forName("UTF-8")); + os = WriterOutputStream.builder() + .setWriter(writer) + .setCharset(StandardCharsets.UTF_8) + .get(); ps = new PrintStream(os); while (true) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c2be5d420bfcf..0e317712f826c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5132,10 +5132,10 @@ yarn.router.interceptor.user-thread-pool.keep-alive-time - 0s + 30s This configurable is used to set the keepAliveTime of the thread pool of the interceptor. - Default is 0s. + Default is 30s. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index c7166a1268c55..1e550890fda8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -496,8 +496,9 @@ void testLevelDbRepair() throws IOException { store.init(conf); Mockito.verify(factory, Mockito.times(1)) .repair(Mockito.any(File.class), Mockito.any(Options.class)); - FileFilter fileFilter = new WildcardFileFilter( - "*" + LeveldbTimelineStore.BACKUP_EXT + "*"); + FileFilter fileFilter = WildcardFileFilter.builder() + .setWildcards("*" + LeveldbTimelineStore.BACKUP_EXT +"*") + .get(); assertTrue(path.listFiles(fileFilter).length > 0); } finally { store.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java index b20497728bab1..6d37a9730e1d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java @@ -444,8 +444,9 @@ void testLevelDbRepair() throws IOException { store.init(conf); Mockito.verify(factory, Mockito.times(1)) .repair(Mockito.any(File.class), Mockito.any(Options.class)); - FilenameFilter fileFilter = - new WildcardFileFilter("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*"); + FilenameFilter fileFilter = WildcardFileFilter.builder() + .setWildcards("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*") + .get(); assertTrue(new File(path.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME) .list(fileFilter).length > 0); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java index af0bd6184b797..76380af8c986e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -95,6 +94,10 @@ public GPGContext getGPGContext() { return this.gpgContext; } + public FederationRegistryClient getRegistryClient() { + return this.registryClient; + } + /** * Query router for applications. * @@ -152,18 +155,6 @@ public Set getRouterKnownApplications() throws YarnException { + " success Router queries after " + totalAttemptCount + " retries"); } - protected void cleanupAppRecordInRegistry(Set knownApps) { - List allApps = this.registryClient.getAllApplications(); - LOG.info("Got {} existing apps in registry.", allApps.size()); - for (String app : allApps) { - ApplicationId appId = ApplicationId.fromString(app); - if (!knownApps.contains(appId)) { - LOG.info("removing finished application entry for {}", app); - this.registryClient.removeAppFromRegistry(appId, true); - } - } - } - @Override public abstract void run(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java index 5b2ff26fcfb4d..c3f79d0284c5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -45,33 +46,49 @@ public void run() { LOG.info("Application cleaner run at time {}", now); FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade(); - Set candidates = new HashSet<>(); try { + // Get the candidate list from StateStore before calling router + Set allStateStoreApps = new HashSet<>(); List response = facade.getApplicationsHomeSubCluster(); for (ApplicationHomeSubCluster app : response) { - candidates.add(app.getApplicationId()); + allStateStoreApps.add(app.getApplicationId()); } - LOG.info("{} app entries in FederationStateStore", candidates.size()); + LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size()); + // Get the candidate list from Registry before calling router + List allRegistryApps = getRegistryClient().getAllApplications(); + LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size()); + + // Get the list of known apps from Router Set routerApps = getRouterKnownApplications(); LOG.info("{} known applications from Router", routerApps.size()); - candidates.removeAll(routerApps); - LOG.info("Deleting {} applications from statestore", candidates.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString) - .collect(Collectors.joining(","))); - } - for (ApplicationId appId : candidates) { + // Clean up StateStore entries + Set toDelete = + Sets.difference(allStateStoreApps, routerApps); + + LOG.info("Deleting {} applications from statestore", toDelete.size()); + LOG.debug("Apps to delete: {}.", + toDelete.stream().map(Object::toString).collect(Collectors.joining(","))); + + for (ApplicationId appId : toDelete) { try { + LOG.debug("Deleting {} from statestore ", appId); facade.deleteApplicationHomeSubCluster(appId); } catch (Exception e) { LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); } } - // Clean up registry entries - cleanupAppRecordInRegistry(routerApps); + + // Clean up Registry entries + for (String app : allRegistryApps) { + ApplicationId appId = ApplicationId.fromString(app); + if (!routerApps.contains(appId)) { + LOG.debug("removing finished application entry for {}", app); + getRegistryClient().removeAppFromRegistry(appId, true); + } + } } catch (Throwable e) { LOG.error("Application cleaner started at time {} fails. ", now, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java index 1e703b51960e2..c028bbdbe2c17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; @@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner { // The list of applications returned by mocked router private Set routerAppIds; + private ApplicationId appIdToAddConcurrently; + @Before public void setup() throws Exception { conf = new YarnConfiguration(); @@ -111,6 +114,7 @@ public void setup() throws Exception { new Token()); } Assert.assertEquals(3, registryClient.getAllApplications().size()); + appIdToAddConcurrently = null; } @After @@ -159,7 +163,42 @@ public class TestableDefaultApplicationCleaner extends DefaultApplicationCleaner { @Override public Set getAppsFromRouter() throws YarnRuntimeException { + if (appIdToAddConcurrently != null) { + SubClusterId scId = SubClusterId.newInstance("MySubClusterId"); + try { + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster); + stateStore.addApplicationHomeSubCluster(request); + } catch (YarnException e) { + throw new YarnRuntimeException(e); + } + registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(), + new Token<>()); + } return routerAppIds; } } + + @Test + public void testConcurrentNewApp() throws YarnException { + appIdToAddConcurrently = ApplicationId.newInstance(1, 1); + + appCleaner.run(); + + // The concurrently added app should be still there + GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest = + GetApplicationsHomeSubClusterRequest.newInstance(); + GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster = + stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest); + Assert.assertNotNull(applicationsHomeSubCluster); + List appsHomeSubClusters = + applicationsHomeSubCluster.getAppsHomeSubClusters(); + Assert.assertNotNull(appsHomeSubClusters); + Assert.assertEquals(1, appsHomeSubClusters.size()); + + // The concurrently added app should be still there + Assert.assertEquals(1, registryClient.getAllApplications().size()); + } }