From fccc268cdea8db5cbe69962c040b17069d2ec302 Mon Sep 17 00:00:00 2001 From: Felix Nguyen Date: Mon, 23 Sep 2024 15:39:56 +0800 Subject: [PATCH] HADOOP-19283. Move all DistCp execution logic to execute() (#7060) Co-authored-by: Felix Nguyen --- .../java/org/apache/hadoop/tools/DistCp.java | 28 ++++++++++++++----- .../apache/hadoop/tools/TestExternalCall.java | 5 +++- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 141f45d61f372..cba47545797f3 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -140,9 +140,7 @@ public int run(String[] argv) { try { context = new DistCpContext(OptionsParser.parse(argv)); - checkSplitLargeFile(); - setTargetPathExists(); - LOG.info("Input Options: " + context); + LOG.info("Input Options: {}", context); } catch (Throwable e) { LOG.error("Invalid arguments: ", e); System.err.println("Invalid arguments: " + e.getMessage()); @@ -152,7 +150,7 @@ public int run(String[] argv) { Job job = null; try { - job = execute(); + job = execute(true); } catch (InvalidInputException e) { LOG.error("Invalid input: ", e); return DistCpConstants.INVALID_ARGUMENT; @@ -169,7 +167,7 @@ public int run(String[] argv) { LOG.error("Exception encountered ", e); return DistCpConstants.UNKNOWN_ERROR; } finally { - //Blocking distcp so close the job after its done + // Blocking distcp so close the job after it's done if (job != null && context.shouldBlock()) { try { job.close(); @@ -181,15 +179,31 @@ public int run(String[] argv) { return DistCpConstants.SUCCESS; } + /** + * Original entrypoint of a distcp job. Calls {@link DistCp#execute(boolean)} + * without doing extra context checks and setting some configs. + * @return Job handle + * @throws Exception when fails to submit distcp job or distcp job fails + */ + public Job execute() throws Exception { + return execute(false); + } + /** * Implements the core-execution. Creates the file-list for copy, * and launches the Hadoop-job, to do the copy. + * @param extraContextChecks if true, does extra context checks and sets some configs. * @return Job handle - * @throws Exception + * @throws Exception when fails to submit distcp job or distcp job fails, or context checks fail */ - public Job execute() throws Exception { + public Job execute(boolean extraContextChecks) throws Exception { Preconditions.checkState(context != null, "The DistCpContext should have been created before running DistCp!"); + if (extraContextChecks) { + checkSplitLargeFile(); + setTargetPathExists(); + } + Job job = createAndSubmitJob(); if (context.shouldBlock()) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java index eba86a985addf..435181ce59c5e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java @@ -156,7 +156,10 @@ public void testCleanupOfJob() throws Exception { DistCp distcp = mock(DistCp.class); Job job = spy(Job.class); Mockito.when(distcp.getConf()).thenReturn(conf); - Mockito.when(distcp.execute()).thenReturn(job); + Mockito.when(distcp.createAndSubmitJob()).thenReturn(job); + Mockito.when(distcp.execute()).thenCallRealMethod(); + Mockito.when(distcp.execute(Mockito.anyBoolean())).thenCallRealMethod(); + Mockito.doReturn(true).when(job).waitForCompletion(Mockito.anyBoolean()); Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod(); String[] arg = { soure.toString(), target.toString() };