Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11594
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 31, 2023
2 parents 10b1cca + f1ce273 commit c2da186
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 53 deletions.
28 changes: 14 additions & 14 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,20 @@ org.apache.solr:solr-solrj:8.11.2
org.apache.yetus:audience-annotations:0.5.0
org.apache.zookeeper:zookeeper:3.7.2
org.codehaus.jettison:jettison:1.5.4
org.eclipse.jetty:jetty-annotations:9.4.51.v20230217
org.eclipse.jetty:jetty-http:9.4.51.v20230217
org.eclipse.jetty:jetty-io:9.4.51.v20230217
org.eclipse.jetty:jetty-jndi:9.4.51.v20230217
org.eclipse.jetty:jetty-plus:9.4.51.v20230217
org.eclipse.jetty:jetty-security:9.4.51.v20230217
org.eclipse.jetty:jetty-server:9.4.51.v20230217
org.eclipse.jetty:jetty-servlet:9.4.51.v20230217
org.eclipse.jetty:jetty-util:9.4.51.v20230217
org.eclipse.jetty:jetty-util-ajax:9.4.51.v20230217
org.eclipse.jetty:jetty-webapp:9.4.51.v20230217
org.eclipse.jetty:jetty-xml:9.4.51.v20230217
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.51.v20230217
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.51.v20230217
org.eclipse.jetty:jetty-annotations:9.4.53.v20231009
org.eclipse.jetty:jetty-http:9.4.53.v20231009
org.eclipse.jetty:jetty-io:9.4.53.v20231009
org.eclipse.jetty:jetty-jndi:9.4.53.v20231009
org.eclipse.jetty:jetty-plus:9.4.53.v20231009
org.eclipse.jetty:jetty-security:9.4.53.v20231009
org.eclipse.jetty:jetty-server:9.4.53.v20231009
org.eclipse.jetty:jetty-servlet:9.4.53.v20231009
org.eclipse.jetty:jetty-util:9.4.53.v20231009
org.eclipse.jetty:jetty-util-ajax:9.4.53.v20231009
org.eclipse.jetty:jetty-webapp:9.4.53.v20231009
org.eclipse.jetty:jetty-xml:9.4.53.v20231009
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.53.v20231009
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.53.v20231009
org.ehcache:ehcache:3.3.1
org.ini4j:ini4j:0.5.4
org.lz4:lz4-java:1.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ public Void run() throws Exception {
private void doGracefulFailover()
throws ServiceFailedException, IOException, InterruptedException {
int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
Preconditions.checkArgument(timeout >= 0, "timeout should be non-negative.");

// Phase 1: pre-flight checks
checkEligibleForFailover();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void testOfflineImageViewer() throws Exception {
FSImageTestUtil.getFSImage(
cluster.getNameNode()).getStorage().getStorageDir(0));
assertNotNull("Didn't generate or can't find fsimage", originalFsimage);
PrintStream o = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM);
PrintStream o = new PrintStream(NullOutputStream.INSTANCE);
PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), o);
v.visit(new RandomAccessFile(originalFsimage, "r"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private static FileStatus pathToFileEntry(FileSystem hdfs, String file)
@Test(expected = IOException.class)
public void testTruncatedFSImage() throws IOException {
File truncatedFile = new File(tempDir, "truncatedFsImage");
PrintStream output = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM);
PrintStream output = new PrintStream(NullOutputStream.INSTANCE);
copyPartOfFile(originalFsimage, truncatedFile);
try (RandomAccessFile r = new RandomAccessFile(truncatedFile, "r")) {
new FileDistributionCalculator(new Configuration(), 0, 0, false, output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,10 @@ public static class MapOutputBuffer<K extends Object, V extends Object>
new ArrayList<SpillRecord>();
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
private int spillFilesCountLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;
private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;

private MapTask mapTask;
private MapOutputFile mapOutputFile;
Expand Down Expand Up @@ -984,10 +987,17 @@ public void init(MapOutputCollector.Context context
MRJobConfig.DEFAULT_IO_SORT_MB);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT,
SPILL_FILES_COUNT_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& spillFilesCountLimit < 0) {
throw new IOException("Invalid value for \"" + JobContext.SPILL_FILES_COUNT_LIMIT + "\", " +
"current value: " + spillFilesCountLimit);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
Expand Down Expand Up @@ -1698,7 +1708,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
incrementNumSpills();
} finally {
if (out != null) out.close();
if (partitionOut != null) {
Expand Down Expand Up @@ -1774,7 +1784,7 @@ private void spillSingleRecord(final K key, final V value,
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
incrementNumSpills();
} finally {
if (out != null) out.close();
if (partitionOut != null) {
Expand Down Expand Up @@ -2022,14 +2032,29 @@ private void sameVolRename(Path srcPath,
if (!dst.getParentFile().exists()) {
if (!dst.getParentFile().mkdirs()) {
throw new IOException("Unable to rename " + src + " to "
+ dst + ": couldn't create parent directory");
+ dst + ": couldn't create parent directory");
}
}

if (!src.renameTo(dst)) {
throw new IOException("Unable to rename " + src + " to " + dst);
}
}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
} // MapOutputBuffer

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public interface MRJobConfig {
public static final int DEFAULT_IO_SORT_MB = 100;

public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit";

public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@
set to less than .5</description>
</property>

<property>
<name>mapreduce.task.spill.files.count.limit</name>
<value>-1</value>
<description>Number of spill files that can be created by a MapTask.
After breaching this, task will fail. Default value for this config is -1
which indicates that there is no limit on number of spill files being
created</description>
</property>

<property>
<name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
<value>-1</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@
import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
Expand All @@ -51,6 +57,9 @@ public void cleanup() throws Exception {
FileUtil.fullyDelete(TEST_ROOT_DIR);
}

@Rule
public ExpectedException exception = ExpectedException.none();

// Verify output files for shuffle have group read permission even when
// the configured umask normally would prevent it.
@Test
Expand Down Expand Up @@ -84,4 +93,73 @@ public void testShufflePermissions() throws Exception {
Assert.assertEquals("Incorrect index file perms",
(short)0640, perms.toShort());
}

@Test
public void testSpillFilesCountLimitInvalidValue() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();

exception.expect(IOException.class);
exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " +
"current value: -2");

mob.init(ctx);
mob.close();
}

@Test
public void testSpillFilesCountBreach() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
mob.numSpills = 2;
mob.init(ctx);

Method method = mob.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(mob);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
}

mob.close();

Assert.assertTrue(gotExceptionWithMessage);
}
}
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<!--Whether to proceed to next module if any test failures exist-->
<maven.test.failure.ignore>true</maven.test.failure.ignore>
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<jetty.version>9.4.51.v20230217</jetty.version>
<jetty.version>9.4.53.v20231009</jetty.version>
<test.exclude>_</test.exclude>
<test.exclude.pattern>_</test.exclude.pattern>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -505,7 +506,10 @@ void generateScript(File localScript) throws IOException {
String classpath = halrJarPath + File.pathSeparator + harJarPath;
FileWriterWithEncoding fw = null;
try {
fw = new FileWriterWithEncoding(localScript, "UTF-8");
fw = FileWriterWithEncoding.builder()
.setFile(localScript)
.setCharset(StandardCharsets.UTF_8)
.get();
fw.write("#!/bin/bash\nset -e\nset -x\n");
int containerCount = 1;
for (AppInfo context : eligibleApplications) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.PrintStream;
import java.io.Writer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -783,7 +784,10 @@ public static void readAcontainerLogs(DataInputStream valueStream,
OutputStream os = null;
PrintStream ps = null;
try {
os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
os = WriterOutputStream.builder()
.setWriter(writer)
.setCharset(StandardCharsets.UTF_8)
.get();
ps = new PrintStream(os);
while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5132,10 +5132,10 @@

<property>
<name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
<value>0s</value>
<value>30s</value>
<description>
This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
Default is 0s.
Default is 30s.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,9 @@ void testLevelDbRepair() throws IOException {
store.init(conf);
Mockito.verify(factory, Mockito.times(1))
.repair(Mockito.any(File.class), Mockito.any(Options.class));
FileFilter fileFilter = new WildcardFileFilter(
"*" + LeveldbTimelineStore.BACKUP_EXT + "*");
FileFilter fileFilter = WildcardFileFilter.builder()
.setWildcards("*" + LeveldbTimelineStore.BACKUP_EXT +"*")
.get();
assertTrue(path.listFiles(fileFilter).length > 0);
} finally {
store.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,9 @@ void testLevelDbRepair() throws IOException {
store.init(conf);
Mockito.verify(factory, Mockito.times(1))
.repair(Mockito.any(File.class), Mockito.any(Options.class));
FilenameFilter fileFilter =
new WildcardFileFilter("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*");
FilenameFilter fileFilter = WildcardFileFilter.builder()
.setWildcards("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*")
.get();
assertTrue(new File(path.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME)
.list(fileFilter).length > 0);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.time.DurationFormatUtils;
Expand Down Expand Up @@ -95,6 +94,10 @@ public GPGContext getGPGContext() {
return this.gpgContext;
}

public FederationRegistryClient getRegistryClient() {
return this.registryClient;
}

/**
* Query router for applications.
*
Expand Down Expand Up @@ -152,18 +155,6 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}

protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}

@Override
public abstract void run();
}
Loading

0 comments on commit c2da186

Please sign in to comment.