diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
index 1dc50370ef..72d9dd0e69 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java
@@ -16,54 +16,69 @@
package io.cdap.plugin.gcp.gcs.sink;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
/**
* Output Committer which creates and delegates operations to other GCS Output Committer instances.
- *
+ *
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
*/
public class DelegatingGCSOutputCommitter extends OutputCommitter {
- private final Map committerMap;
- public DelegatingGCSOutputCommitter() {
- committerMap = new HashMap<>();
+ private final TaskAttemptContext taskAttemptContext;
+ private boolean firstTable = true;
+ private static final String PARTITIONS_FILE_SUFFIX = "_partitions.txt";
+
+ public DelegatingGCSOutputCommitter(TaskAttemptContext taskAttemptContext) {
+ this.taskAttemptContext = taskAttemptContext;
}
/**
* Add a new GCSOutputCommitter based on a supplied Output Format and Table Name.
- *
+ *
* This GCS Output Committer gets initialized when created.
*/
@SuppressWarnings("rawtypes")
public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
- TaskAttemptContext context,
String tableName) throws IOException, InterruptedException {
//Set output directory
- context.getConfiguration().set(FileOutputFormat.OUTDIR,
- DelegatingGCSOutputUtils.buildOutputPath(context.getConfiguration(), tableName));
+ taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR,
+ DelegatingGCSOutputUtils.buildOutputPath(
+ taskAttemptContext.getConfiguration(), tableName));
//Wrap output committer into the GCS Output Committer.
- GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(context));
+ GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
- //Initialize the new GCS Output Committer and add it to the Committer Map
- gcsOutputCommitter.setupJob(context);
- gcsOutputCommitter.setupTask(context);
- committerMap.put(tableName, gcsOutputCommitter);
+ gcsOutputCommitter.setupJob(taskAttemptContext);
+ gcsOutputCommitter.setupTask(taskAttemptContext);
+ writePartitionFile(taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR), taskAttemptContext);
+ firstTable = false;
}
@Override
public void setupJob(JobContext jobContext) throws IOException {
- //no-op
+ Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
+ FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
+ Path tempPath = new Path(outputPath, getPendingDirPath(jobContext.getJobID()));
+ fs.mkdirs(tempPath);
}
@Override
@@ -73,39 +88,40 @@ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException
@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
- if (committerMap.isEmpty()) {
- return false;
- }
-
- boolean needsTaskCommit = true;
-
- for (OutputCommitter committer : committerMap.values()) {
- needsTaskCommit = needsTaskCommit && committer.needsTaskCommit(taskAttemptContext);
- }
-
- return needsTaskCommit;
+ return true;
}
@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
- for (OutputCommitter committer : committerMap.values()) {
+ for (String output : getOutputPaths(taskAttemptContext)) {
+ FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.commitTask(taskAttemptContext);
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
- for (OutputCommitter committer : committerMap.values()) {
+ for (String output : getOutputPaths(jobContext)) {
+ FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.commitJob(jobContext);
}
+ cleanupJob(jobContext);
+ }
+
+ @Override
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
+ FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
+ // delete the temporary directory that has partition information in text files.
+ fs.delete(new Path(outputPath, getPendingDirPath(jobContext.getJobID())), true);
}
@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
IOException ioe = null;
-
- for (OutputCommitter committer : committerMap.values()) {
+ for (String output : getOutputPaths(taskAttemptContext)) {
try {
+ FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.abortTask(taskAttemptContext);
} catch (IOException e) {
if (ioe == null) {
@@ -124,21 +140,108 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
IOException ioe = null;
-
- for (OutputCommitter committer : committerMap.values()) {
- try {
+ try {
+ for (String output : getOutputPaths(jobContext)) {
+ taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
+ FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.abortJob(jobContext, state);
- } catch (IOException e) {
- if (ioe == null) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
}
+ } catch (IOException e) {
+ if (ioe == null) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ } finally {
+ cleanupJob(jobContext);
}
-
if (ioe != null) {
throw ioe;
}
}
+
+ // return path lists based on JobContext configuration.
+ private Set getOutputPaths(JobContext jobContext) throws IOException {
+ Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
+ FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
+ return getOutputPathsFromTempPartitionFile(outputPath, fs, null, jobContext.getJobID());
+ }
+
+ private Set getOutputPaths(TaskAttemptContext taskAttemptContext) throws IOException {
+ Path outputPath = new Path(
+ taskAttemptContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
+ FileSystem fs = outputPath.getFileSystem(taskAttemptContext.getConfiguration());
+ return getOutputPathsFromTempPartitionFile(outputPath, fs,
+ taskAttemptContext.getTaskAttemptID().getTaskID().toString(),
+ taskAttemptContext.getJobID());
+ }
+
+ /**
+ * This method will return the full path up to path suffix after reading from partitions.txt file
+ * If method is getting called from task context, it will return paths from single file, otherwise all paths
+ *
+ * @param baseOutputPath
+ * @param fs
+ * @param taskId
+ * @param jobID
+ * @return
+ * @throws IOException
+ */
+ private Set getOutputPathsFromTempPartitionFile(Path baseOutputPath, FileSystem fs, @Nullable String taskId,
+ JobID jobID) throws IOException {
+ Set outputPaths = new HashSet<>();
+ Path tempPath = taskId == null ? new Path(baseOutputPath, getPendingDirPath(jobID))
+ : new Path(baseOutputPath, String.format("%s/%s%s", getPendingDirPath(jobID), taskId,
+ PARTITIONS_FILE_SUFFIX));
+
+ if (!fs.exists(tempPath)) {
+ return outputPaths;
+ }
+
+ for (FileStatus status : fs.listStatus(tempPath)) {
+ if (status.getPath().getName().endsWith(PARTITIONS_FILE_SUFFIX)) {
+ try (FSDataInputStream dis = fs.open(status.getPath())) {
+ while (true) {
+ try {
+ outputPaths.add(dis.readUTF());
+ } catch (EOFException e) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ return outputPaths;
+ }
+
+ /**
+ * This method will create a _temporary_{jobID} directory in base directory path and will create a file with name
+ * {taskid}_partitions.txt which will store the full path upto path suffix. e.g. gs://basepath/tablename/path_suffix
+ *
+ * @param path Split file path upto split field name
+ * @param context
+ * @throws IOException
+ */
+ private void writePartitionFile(String path, TaskAttemptContext context) throws IOException {
+ Path outputPath = new Path(context.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
+ Path tempPath = new Path(outputPath, getPendingDirPath(context.getJobID()));
+ FileSystem fs = tempPath.getFileSystem(context.getConfiguration());
+ String taskId = context.getTaskAttemptID().getTaskID().toString();
+ Path taskPartitionFile = new Path(tempPath, String.format("%s%s", taskId, PARTITIONS_FILE_SUFFIX));
+ if (!fs.exists(taskPartitionFile)) {
+ fs.createNewFile(taskPartitionFile);
+ } else if (firstTable) {
+ fs.create(taskPartitionFile, true);
+ }
+ try (DataOutputStream out = fs.append(taskPartitionFile)) {
+ out.writeUTF(path);
+ }
+ }
+
+ // This will create a directory with name _temporary_{jobId} to write the partition files
+ // Job ID added as a suffix, so that multiple pipelines can write to same path in parallel.
+ private String getPendingDirPath(JobID jobId) {
+ return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
+ }
+
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
index f2be3db50c..2fe4eeff03 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java
@@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -39,10 +38,8 @@ public class DelegatingGCSOutputFormat extends OutputFormat getRecordWriter(TaskAttemptC
Configuration hConf = context.getConfiguration();
String partitionField = hConf.get(PARTITION_FIELD);
- return new DelegatingGCSRecordWriter(context, partitionField, outputCommitter);
+ return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context));
}
@Override
@@ -74,8 +71,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
}
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return outputCommitter;
+ public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new DelegatingGCSOutputCommitter(context);
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
index b1c920a954..3bd6a13cc0 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java
@@ -21,8 +21,6 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@@ -34,7 +32,6 @@
* This Record Writer will initialize record writes and Output Committers as needed.
*/
public class DelegatingGCSRecordWriter extends RecordWriter {
- private static final Logger LOG = LoggerFactory.getLogger(DelegatingGCSRecordWriter.class);
private final TaskAttemptContext context;
private final String partitionField;
private final Map> delegateMap;
@@ -63,7 +60,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());
//Initialize GCS Output Committer for this format.
- delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, context, tableName);
+ delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);
//Add record writer to delegate map.
delegate = format.getRecordWriter(context);
@@ -79,12 +76,6 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
for (RecordWriter delegate : delegateMap.values()) {
delegate.close(context);
}
-
- // Call the Commit Task and Commit Job implementations of this plugin to copy files into their final directory.
- // We need to do this at this stage because the OutputCommitter needs to be aware of the different partitions
- // that have been stored so far.
- delegatingGCSOutputCommitter.commitTask(context);
- delegatingGCSOutputCommitter.commitJob(context);
}
}
diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java
new file mode 100644
index 0000000000..f4bdc6dc19
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/TestDelegatingGCSOutputCommitter.java
@@ -0,0 +1,424 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.sink;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.MRConstants;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * class to test the DelegatingGCSOutputCommitter class
+ */
+public class TestDelegatingGCSOutputCommitter {
+ private static Path outDir = new Path("file:///tmp/output");
+
+ // A random task attempt id for testing.
+ private static String attempt = "attempt_200707121733_0001_m_000000_0";
+ private static String partFile = "part-m-00000.avro";
+ private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+ private static String key1 = "key1";
+ private static String key2 = "key2";
+ private String schema = "{\"type\":\"record\",\"name\":\"text\",\"fields\":[" +
+ "{\"name\":\"key1\",\"type\":\"string\"}," +
+ "{\"name\":\"key2\",\"type\":\"string\"}]}";
+ private StructuredRecord record1 = StructuredRecord.builder(Schema.parseJson(schema))
+ .set(key1, "abc")
+ .set(key2, "val1")
+ .build();
+ private StructuredRecord record2 = StructuredRecord.builder(Schema.parseJson(schema))
+ .set(key1, "abc")
+ .set(key2, "record2")
+ .build();
+ private static final String tableName = "abc";
+ private static final String pathSuffix = LocalDate.now().format(DateTimeFormatter.ISO_DATE);
+
+ public TestDelegatingGCSOutputCommitter() throws IOException {
+ }
+
+ private void writeOutput(TaskAttemptContext context, DelegatingGCSOutputCommitter committer) throws IOException,
+ InterruptedException {
+ NullWritable nullWritable = NullWritable.get();
+ DelegatingGCSRecordWriter delegatingGCSRecordWriter = new DelegatingGCSRecordWriter(context, key1,
+ committer);
+ try {
+ delegatingGCSRecordWriter.write(nullWritable, record1);
+ delegatingGCSRecordWriter.write(nullWritable, record2);
+ } finally {
+ delegatingGCSRecordWriter.close(null);
+ }
+ }
+
+ private JobConf getConfiguration() {
+ JobConf conf = new JobConf();
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+ conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ conf.setBoolean("fs.file.impl.disable.cache", true);
+ conf.set(DelegatingGCSOutputFormat.DELEGATE_CLASS,
+ "io.cdap.plugin.format.avro.output.StructuredAvroOutputFormat");
+ conf.set(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR, outDir.toString());
+ conf.set(DelegatingGCSOutputFormat.PARTITION_FIELD, "key1");
+ conf.set(DelegatingGCSOutputFormat.OUTPUT_PATH_SUFFIX, pathSuffix);
+ conf.set("avro.schema.output.key", schema);
+ return conf;
+ }
+
+ @Test
+ public void testRecoveryInternal() throws Exception {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+ writeOutput(tContext, committer);
+ if (committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+
+
+ String tempPath = String.format("%s/%s/%s/%s/%s", outDir.toUri(), tableName, pathSuffix,
+ "_temporary/1/", taskID.getTaskID());
+ Path jobTempDir1 = new Path(tempPath);
+ File jtd1 = new File(jobTempDir1.toUri().getPath());
+ assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
+ validateContent(jobTempDir1);
+
+ Assert.assertFalse(committer.isRecoverySupported(jContext));
+ FileUtil.fullyDelete(new File(outDir.toUri()));
+ }
+
+ private void validateContent(Path dir) throws IOException {
+ File fdir = new File(dir.toUri().getPath());
+ File expectedFile = new File(fdir, partFile);
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(record1.get(key1).toString()).append('\t').append(record1.get(key2).toString()).append("\n");
+ expectedOutput.append(record2.get(key1).toString()).append('\t').append(record2.get(key2).toString()).append("\n");
+ String output = slurpAvro(expectedFile);
+ assertEquals(output, expectedOutput.toString());
+ }
+
+ @Test
+ public void testCommitterWithFailureV1() throws Exception {
+ testCommitterWithFailureInternal(1);
+ testCommitterWithFailureInternal(2);
+ }
+
+ private void testCommitterWithFailureInternal(int maxAttempts) throws Exception {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new CommitterWithFailedThenSucceed(tContext);
+ // write output
+ writeOutput(tContext, committer);
+
+ // do commit
+ if (committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+
+ try {
+ committer.commitJob(jContext);
+ if (maxAttempts <= 1) {
+ Assert.fail("Commit successful: wrong behavior for version 1.");
+ }
+ } catch (IOException e) {
+ }
+
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ @Test
+ public void testCommitterWithDuplicatedCommit() throws Exception {
+ testCommitterWithDuplicatedCommitInternal();
+ }
+
+ private void testCommitterWithDuplicatedCommitInternal() throws
+ Exception {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+ writeOutput(tContext, committer);
+ if (committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+ committer.commitJob(jContext);
+
+ // validate output
+ validateContent(new Path(String.format("%s/%s/%s", outDir, tableName, pathSuffix)));
+
+ // commit again
+ committer.commitJob(jContext);
+ // It will not fail as this time it will not get any output path as we have removed the _temporaryJob directory
+ // that contains the partitions file having output path for each table.
+ FileUtil.fullyDelete(new File(outDir.toUri()));
+ }
+
+ private void testCommitterInternal() throws Exception {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+ writeOutput(tContext, committer);
+ if (committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+ committer.commitJob(jContext);
+
+ // validate output
+ validateContent(new Path(String.format("%s/%s/%s", outDir, tableName, pathSuffix)));
+ FileUtil.fullyDelete(new File(outDir.toUri()));
+ }
+
+ @Test
+ public void testCommitter() throws Exception {
+ testCommitterInternal();
+ }
+
+ @Test
+ public void testMapOnlyNoOutput() throws Exception {
+ testMapOnlyNoOutputInternal();
+ }
+
+ private void testMapOnlyNoOutputInternal() throws Exception {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+ if (committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+ committer.commitJob(jContext);
+
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ private void testAbortInternal()
+ throws IOException, InterruptedException {
+ JobConf conf = getConfiguration();
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+
+ // write output
+ writeOutput(tContext, committer);
+
+ // do abort
+ committer.abortTask(tContext);
+
+ File out = new File(outDir.toUri().getPath());
+ String workPathDir = String.format("%s/%s/%s/%s/%s", outDir.toUri(), tableName, pathSuffix,
+ "_temporary/1/", attempt);
+ Path workPath = new Path(workPathDir); //temp attemptid path
+ File wp = new File(workPath.toUri().getPath());
+ File expectedFile = new File(wp, partFile);
+ assertFalse("task temp dir still exists", expectedFile.exists());
+
+ committer.abortJob(jContext, JobStatus.State.FAILED);
+ expectedFile = new File(out, String.format("%s_%s", FileOutputCommitter.TEMP_DIR_NAME, taskID.getJobID()));
+ assertFalse("job temp dir still exists", expectedFile.exists());
+ File tablePath = new File(out, String.format("%s/%s", tableName, pathSuffix));
+ assertEquals("Output directory not empty", 0, tablePath.listFiles().length);
+ FileUtil.fullyDelete(out);
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ testAbortInternal();
+ }
+
+ public static class FakeFileSystem extends RawLocalFileSystem {
+ public FakeFileSystem() {
+ super();
+ }
+
+ public URI getUri() {
+ return URI.create("file:///");
+ }
+
+ @Override
+ public boolean delete(Path p, boolean recursive) throws IOException {
+ throw new IOException("fake delete failed");
+ }
+ }
+
+
+ private void testFailAbortInternal()
+ throws IOException, InterruptedException {
+ JobConf conf = getConfiguration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ conf.setClass("fs.file.impl", FakeFileSystem.class, FileSystem.class);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ DelegatingGCSOutputCommitter committer = new DelegatingGCSOutputCommitter(tContext);
+
+ // write output
+ writeOutput(tContext, committer);
+
+ File jobTmpDir = new File(
+ new Path(outDir.toUri().getPath(), tableName + Path.SEPARATOR + pathSuffix + Path.SEPARATOR +
+ FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0) +
+ Path.SEPARATOR +
+ FileOutputCommitter.TEMP_DIR_NAME).toString());
+ File taskTmpDir = new File(jobTmpDir, "" + taskID);
+ File expectedFile = new File(taskTmpDir, partFile);
+
+ // do abort
+ Throwable th = null;
+ try {
+ committer.abortTask(tContext);
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull(th);
+ assertTrue(th instanceof IOException);
+ assertTrue(th.getMessage().contains("fake delete failed"));
+ assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+ th = null;
+ try {
+ committer.abortJob(jContext, JobStatus.State.FAILED);
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull(th);
+ assertTrue(th instanceof IOException);
+ assertTrue(th.getMessage().contains("fake delete failed"));
+ assertTrue("job temp dir does not exists", jobTmpDir.exists());
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ @Test
+ public void testFailAbort() throws Exception {
+ testFailAbortInternal();
+ }
+
+ public static String slurpAvro(File f) throws IOException {
+ StringBuffer expectedOutput = new StringBuffer();
+ try {
+ // Create a DatumReader for reading GenericRecord from Avro file
+ DatumReader datumReader = new GenericDatumReader<>();
+ // Create a DataFileReader for reading Avro file
+ try (DataFileReader dataFileReader = new DataFileReader<>(f, datumReader)) {
+
+ // Iterate over records in the Avro file
+ while (dataFileReader.hasNext()) {
+ // Read the next record
+ GenericRecord record = dataFileReader.next();
+ expectedOutput.append(record.get(key1)).append('\t').append(record.get(key2)).append("\n");
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return expectedOutput.toString();
+ }
+
+ /**
+ * The class provides a overrided implementation of commitJobInternal which
+ * causes the commit failed for the first time then succeed.
+ */
+ public static class CommitterWithFailedThenSucceed extends
+ DelegatingGCSOutputCommitter {
+ boolean firstTimeFail = true;
+
+ public CommitterWithFailedThenSucceed(TaskAttemptContext context) throws IOException {
+ super(context);
+ }
+
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped =
+ new CommitterFailedFirst(new Path(conf.get("mapreduce.output.fileoutputformat.outputdir")),
+ context);
+ wrapped.commitJob(context);
+ }
+ }
+
+ public static class CommitterFailedFirst extends
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter {
+ boolean firstTimeFail = true;
+
+ public CommitterFailedFirst(Path outputPath,
+ JobContext context) throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext
+ context) throws IOException {
+ super.commitJobInternal(context);
+ if (firstTimeFail) {
+ firstTimeFail = false;
+ throw new IOException();
+ } else {
+ // succeed then, nothing to do
+ }
+ }
+ }
+}