Skip to content

Commit

Permalink
MAPREDUCE-7457: Added support to limit count of spill files (#6155) C…
Browse files Browse the repository at this point in the history
…ontributed by Mudit Sharma.

Reviewed-by: Shilun Fan <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
mudit1289 authored Oct 30, 2023
1 parent 254dbab commit f1ce273
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,10 @@ public static class MapOutputBuffer<K extends Object, V extends Object>
new ArrayList<SpillRecord>();
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
private int spillFilesCountLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;
private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;

private MapTask mapTask;
private MapOutputFile mapOutputFile;
Expand Down Expand Up @@ -984,10 +987,17 @@ public void init(MapOutputCollector.Context context
MRJobConfig.DEFAULT_IO_SORT_MB);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT,
SPILL_FILES_COUNT_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& spillFilesCountLimit < 0) {
throw new IOException("Invalid value for \"" + JobContext.SPILL_FILES_COUNT_LIMIT + "\", " +
"current value: " + spillFilesCountLimit);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
Expand Down Expand Up @@ -1698,7 +1708,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
incrementNumSpills();
} finally {
if (out != null) out.close();
if (partitionOut != null) {
Expand Down Expand Up @@ -1774,7 +1784,7 @@ private void spillSingleRecord(final K key, final V value,
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
incrementNumSpills();
} finally {
if (out != null) out.close();
if (partitionOut != null) {
Expand Down Expand Up @@ -2022,14 +2032,29 @@ private void sameVolRename(Path srcPath,
if (!dst.getParentFile().exists()) {
if (!dst.getParentFile().mkdirs()) {
throw new IOException("Unable to rename " + src + " to "
+ dst + ": couldn't create parent directory");
+ dst + ": couldn't create parent directory");
}
}

if (!src.renameTo(dst)) {
throw new IOException("Unable to rename " + src + " to " + dst);
}
}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
} // MapOutputBuffer

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public interface MRJobConfig {
public static final int DEFAULT_IO_SORT_MB = 100;

public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit";

public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@
set to less than .5</description>
</property>

<property>
<name>mapreduce.task.spill.files.count.limit</name>
<value>-1</value>
<description>Number of spill files that can be created by a MapTask.
After breaching this, task will fail. Default value for this config is -1
which indicates that there is no limit on number of spill files being
created</description>
</property>

<property>
<name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
<value>-1</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@
import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
Expand All @@ -51,6 +57,9 @@ public void cleanup() throws Exception {
FileUtil.fullyDelete(TEST_ROOT_DIR);
}

@Rule
public ExpectedException exception = ExpectedException.none();

// Verify output files for shuffle have group read permission even when
// the configured umask normally would prevent it.
@Test
Expand Down Expand Up @@ -84,4 +93,73 @@ public void testShufflePermissions() throws Exception {
Assert.assertEquals("Incorrect index file perms",
(short)0640, perms.toShort());
}

@Test
public void testSpillFilesCountLimitInvalidValue() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();

exception.expect(IOException.class);
exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " +
"current value: -2");

mob.init(ctx);
mob.close();
}

@Test
public void testSpillFilesCountBreach() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
mob.numSpills = 2;
mob.init(ctx);

Method method = mob.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(mob);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
}

mob.close();

Assert.assertTrue(gotExceptionWithMessage);
}
}

0 comments on commit f1ce273

Please sign in to comment.