Skip to content

Commit

Permalink
HDFS-17611. Move all DistCp execution logic to execute() (#7025)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Felix Nguyen <[email protected]>
  • Loading branch information
kokon191 and Felix Nguyen authored Sep 23, 2024
1 parent 28538d6 commit ff75aa5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public int run(String[] argv) {

try {
context = new DistCpContext(OptionsParser.parse(argv));
checkSplitLargeFile();
setTargetPathExists();
LOG.info("Input Options: " + context);
LOG.info("Input Options: {}", context);
} catch (Throwable e) {
LOG.error("Invalid arguments: ", e);
System.err.println("Invalid arguments: " + e.getMessage());
Expand All @@ -152,7 +150,7 @@ public int run(String[] argv) {

Job job = null;
try {
job = execute();
job = execute(true);
} catch (InvalidInputException e) {
LOG.error("Invalid input: ", e);
return DistCpConstants.INVALID_ARGUMENT;
Expand All @@ -169,7 +167,7 @@ public int run(String[] argv) {
LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR;
} finally {
//Blocking distcp so close the job after its done
// Blocking distcp so close the job after it's done
if (job != null && context.shouldBlock()) {
try {
job.close();
Expand All @@ -181,15 +179,31 @@ public int run(String[] argv) {
return DistCpConstants.SUCCESS;
}

/**
* Original entrypoint of a distcp job. Calls {@link DistCp#execute(boolean)}
* without doing extra context checks and setting some configs.
* @return Job handle
* @throws Exception when fails to submit distcp job or distcp job fails
*/
public Job execute() throws Exception {
return execute(false);
}

/**
* Implements the core-execution. Creates the file-list for copy,
* and launches the Hadoop-job, to do the copy.
* @param extraContextChecks if true, does extra context checks and sets some configs.
* @return Job handle
* @throws Exception
* @throws Exception when fails to submit distcp job or distcp job fails, or context checks fail
*/
public Job execute() throws Exception {
public Job execute(boolean extraContextChecks) throws Exception {
Preconditions.checkState(context != null,
"The DistCpContext should have been created before running DistCp!");
if (extraContextChecks) {
checkSplitLargeFile();
setTargetPathExists();
}

Job job = createAndSubmitJob();

if (context.shouldBlock()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ public void testCleanupOfJob() throws Exception {
DistCp distcp = mock(DistCp.class);
Job job = spy(Job.class);
Mockito.when(distcp.getConf()).thenReturn(conf);
Mockito.when(distcp.execute()).thenReturn(job);
Mockito.when(distcp.createAndSubmitJob()).thenReturn(job);
Mockito.when(distcp.execute()).thenCallRealMethod();
Mockito.when(distcp.execute(Mockito.anyBoolean())).thenCallRealMethod();
Mockito.doReturn(true).when(job).waitForCompletion(Mockito.anyBoolean());
Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod();
String[] arg = { soure.toString(), target.toString() };

Expand Down

0 comments on commit ff75aa5

Please sign in to comment.