Skip to content

Commit

Permalink
MAPREDUCE-7474. Improve Manifest committer resilience (#6716)
Browse files Browse the repository at this point in the history
Improve task commit resilience everywhere
and add an option to reduce delete IO requests on
job cleanup (relevant for ABFS and HDFS).

Task Commit Resilience
----------------------

Task manifest saving is re-attempted on failure; the number of 
attempts made is configurable with the option:

  mapreduce.manifest.committer.manifest.save.attempts

* The default is 5.
* The minimum is 1; asking for less is ignored.
* A retry policy adds 500ms of sleep per attempt.
* Move from classic rename() to commitFile() to rename the file,
  after calling getFileStatus() to get its length and possibly etag.
  This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach
  the ResilientCommitByRename callbacks in abfs, which report on
  the outcome to the caller...which is then logged at WARN.
* New statistic task_stage_save_summary_file to distinguish from
  other saving operations (job success/report file).
  This is only saved to the manifest on task commit retries, and
  provides statistics on all previous unsuccessful attempts to save
  the manifests
+ test changes to match the codepath changes, including improvements
  in fault injection.

Directory size for deletion
---------------------------

New option

  mapreduce.manifest.committer.cleanup.parallel.delete.base.first

This attempts an initial attempt at deleting the base dir, only falling
back to parallel deletes if there's a timeout.

This option is disabled by default; Consider enabling it for abfs to
reduce IO load. Consult the documentation for more details.

Success file printing
---------------------

The command to print a JSON _SUCCESS file from this committer and
any S3A committer is now something which can be invoked from
the mapred command:

  mapred successfile <path to file>

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored May 13, 2024
1 parent 12e0ca6 commit c927060
Show file tree
Hide file tree
Showing 34 changed files with 1,438 additions and 262 deletions.
4 changes: 4 additions & 0 deletions hadoop-mapreduce-project/bin/mapred
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ function hadoop_usage
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "minicluster" client "CLI MiniCluster"
hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
}

Expand Down Expand Up @@ -102,6 +103,9 @@ function mapredcmd_case
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
successfile)
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
;;
minicluster)
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*'
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import java.util.Objects;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -51,6 +54,9 @@
*/
public final class ManifestCommitterConfig implements IOStatisticsSource {

private static final Logger LOG = LoggerFactory.getLogger(
ManifestCommitterConfig.class);

/**
* Final destination of work.
* This is <i>unqualified</i>.
Expand Down Expand Up @@ -153,6 +159,12 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
*/
private final int writerQueueCapacity;

/**
* How many attempts to save a task manifest by save and rename
* before giving up.
*/
private final int saveManifestAttempts;

/**
* Constructor.
* @param outputPath destination path of the job.
Expand Down Expand Up @@ -198,6 +210,14 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
this.writerQueueCapacity = conf.getInt(
OPT_WRITER_QUEUE_CAPACITY,
DEFAULT_WRITER_QUEUE_CAPACITY);
int attempts = conf.getInt(OPT_MANIFEST_SAVE_ATTEMPTS,
OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT);
if (attempts < 1) {
LOG.warn("Invalid value for {}: {}",
OPT_MANIFEST_SAVE_ATTEMPTS, attempts);
attempts = 1;
}
this.saveManifestAttempts = attempts;

// if constructed with a task attempt, build the task ID and path.
if (context instanceof TaskAttemptContext) {
Expand Down Expand Up @@ -332,6 +352,10 @@ public String getName() {
return name;
}

public int getSaveManifestAttempts() {
return saveManifestAttempts;
}

/**
* Get writer queue capacity.
* @return the queue capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public final class ManifestCommitterConstants {
* Should dir cleanup do parallel deletion of task attempt dirs
* before trying to delete the toplevel dirs.
* For GCS this may deliver speedup, while on ABFS it may avoid
* timeouts in certain deployments.
* timeouts in certain deployments, something
* {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}
* can alleviate.
* Value: {@value}.
*/
public static final String OPT_CLEANUP_PARALLEL_DELETE =
Expand All @@ -143,6 +145,20 @@ public final class ManifestCommitterConstants {
*/
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;

/**
* Should parallel cleanup try to delete the base first?
* Best for azure as it skips the task attempt deletions unless
* the toplevel delete fails.
* Value: {@value}.
*/
public static final String OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST =
OPT_PREFIX + "cleanup.parallel.delete.base.first";

/**
* Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}.
*/
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = false;

/**
* Threads to use for IO.
*/
Expand Down Expand Up @@ -260,6 +276,19 @@ public final class ManifestCommitterConstants {
*/
public static final int DEFAULT_WRITER_QUEUE_CAPACITY = OPT_IO_PROCESSORS_DEFAULT;

/**
* How many attempts to save a task manifest by save and rename
* before giving up.
* Value: {@value}.
*/
public static final String OPT_MANIFEST_SAVE_ATTEMPTS =
OPT_PREFIX + "manifest.save.attempts";

/**
* Default value of {@link #OPT_MANIFEST_SAVE_ATTEMPTS}: {@value}.
*/
public static final int OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT = 5;

private ManifestCommitterConstants() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ public final class ManifestCommitterStatisticNames {
public static final String OP_SAVE_TASK_MANIFEST =
"task_stage_save_task_manifest";

/**
* Save a summary file: {@value}.
*/
public static final String OP_SAVE_SUMMARY_FILE =
"task_stage_save_summary_file";

/**
* Task abort: {@value}.
*/
Expand Down Expand Up @@ -259,6 +265,9 @@ public final class ManifestCommitterStatisticNames {
public static final String OP_STAGE_TASK_SCAN_DIRECTORY
= "task_stage_scan_directory";

/** Delete a directory: {@value}. */
public static final String OP_DELETE_DIR = "op_delete_dir";

private ManifestCommitterStatisticNames() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class ManifestPrinter extends Configured implements Tool {

private static final String USAGE = "ManifestPrinter <success-file>";
private static final String USAGE = "successfile <success-file>";

/**
* Output for printing.
Expand Down Expand Up @@ -88,7 +88,7 @@ public ManifestSuccessData loadAndPrintManifest(FileSystem fs, Path path)
return success;
}

private void printManifest(ManifestSuccessData success) {
public void printManifest(ManifestSuccessData success) {
field("succeeded", success.getSuccess());
field("created", success.getDate());
field("committer", success.getCommitter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private InternalConstants() {
OP_CREATE_ONE_DIRECTORY,
OP_DIRECTORY_SCAN,
OP_DELETE,
OP_DELETE_DIR,
OP_DELETE_FILE_UNDER_DESTINATION,
OP_GET_FILE_STATUS,
OP_IS_DIRECTORY,
Expand All @@ -85,6 +86,7 @@ private InternalConstants() {
OP_MSYNC,
OP_PREPARE_DIR_ANCESTORS,
OP_RENAME_FILE,
OP_SAVE_SUMMARY_FILE,
OP_SAVE_TASK_MANIFEST,

OBJECT_LIST_REQUEST,
Expand Down Expand Up @@ -127,4 +129,11 @@ private InternalConstants() {
/** Schemas of filesystems we know to not work with this committer. */
public static final Set<String> UNSUPPORTED_FS_SCHEMAS =
ImmutableSet.of("s3a", "wasb");

/**
* Interval in milliseconds between save retries.
* Value {@value} milliseconds.
*/
public static final int SAVE_SLEEP_INTERVAL = 500;

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,35 @@ public boolean isFile(Path path) throws IOException {
public abstract boolean delete(Path path, boolean recursive)
throws IOException;

/**
* Forward to {@code delete(Path, true)}
* unless overridden.
* <p>
* If it returns without an error: there is no file at
* the end of the path.
* @param path path
* @return outcome
* @throws IOException failure.
*/
public boolean deleteFile(Path path)
throws IOException {
return delete(path, false);
}

/**
* Call {@code FileSystem#delete(Path, true)} or equivalent.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @return outcome
* @throws IOException failure.
*/
public boolean deleteRecursive(Path path)
throws IOException {
return delete(path, true);
}

/**
* Forward to {@link FileSystem#mkdirs(Path)}.
* Usual "what does 'false' mean" ambiguity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public boolean delete(Path path, boolean recursive)
return fileSystem.delete(path, recursive);
}

@Override
public boolean deleteRecursive(final Path path) throws IOException {
return fileSystem.delete(path, true);
}

@Override
public boolean mkdirs(Path path)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK;

/**
Expand Down Expand Up @@ -55,7 +56,11 @@ protected Path executeStage(final Boolean suppressExceptions)
final Path dir = getTaskAttemptDir();
if (dir != null) {
LOG.info("{}: Deleting task attempt directory {}", getName(), dir);
deleteDir(dir, suppressExceptions);
if (suppressExceptions) {
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR);
} else {
deleteRecursive(dir, OP_DELETE_DIR);
}
}
return dir;
}
Expand Down
Loading

0 comments on commit c927060

Please sign in to comment.