Skip to content

Commit

Permalink
Merge pull request data-integrations#1414 from cloudsufi/gcs-multisin…
Browse files Browse the repository at this point in the history
…k-new-pr

[PLUGIN-1780]changes done for GCS multisink issue.
  • Loading branch information
vikasrathee-cs authored May 20, 2024
2 parents 2b86c51 + 8c0ce83 commit ec5bf50
Show file tree
Hide file tree
Showing 4 changed files with 572 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,69 @@

package io.cdap.plugin.gcp.gcs.sink;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;

/**
* Output Committer which creates and delegates operations to other GCS Output Committer instances.
*
* <p>
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
*/
public class DelegatingGCSOutputCommitter extends OutputCommitter {
private final Map<String, OutputCommitter> committerMap;

public DelegatingGCSOutputCommitter() {
committerMap = new HashMap<>();
private final TaskAttemptContext taskAttemptContext;
private boolean firstTable = true;
private static final String PARTITIONS_FILE_SUFFIX = "_partitions.txt";

public DelegatingGCSOutputCommitter(TaskAttemptContext taskAttemptContext) {
this.taskAttemptContext = taskAttemptContext;
}

/**
* Add a new GCSOutputCommitter based on a supplied Output Format and Table Name.
*
* <p>
* This GCS Output Committer gets initialized when created.
*/
@SuppressWarnings("rawtypes")
public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
TaskAttemptContext context,
String tableName) throws IOException, InterruptedException {
//Set output directory
context.getConfiguration().set(FileOutputFormat.OUTDIR,
DelegatingGCSOutputUtils.buildOutputPath(context.getConfiguration(), tableName));
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR,
DelegatingGCSOutputUtils.buildOutputPath(
taskAttemptContext.getConfiguration(), tableName));

//Wrap output committer into the GCS Output Committer.
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(context));
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));

//Initialize the new GCS Output Committer and add it to the Committer Map
gcsOutputCommitter.setupJob(context);
gcsOutputCommitter.setupTask(context);
committerMap.put(tableName, gcsOutputCommitter);
gcsOutputCommitter.setupJob(taskAttemptContext);
gcsOutputCommitter.setupTask(taskAttemptContext);
writePartitionFile(taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR), taskAttemptContext);
firstTable = false;
}

@Override
public void setupJob(JobContext jobContext) throws IOException {
//no-op
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
Path tempPath = new Path(outputPath, getPendingDirPath(jobContext.getJobID()));
fs.mkdirs(tempPath);
}

@Override
Expand All @@ -73,39 +88,40 @@ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
if (committerMap.isEmpty()) {
return false;
}

boolean needsTaskCommit = true;

for (OutputCommitter committer : committerMap.values()) {
needsTaskCommit = needsTaskCommit && committer.needsTaskCommit(taskAttemptContext);
}

return needsTaskCommit;
return true;
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
for (OutputCommitter committer : committerMap.values()) {
for (String output : getOutputPaths(taskAttemptContext)) {
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.commitTask(taskAttemptContext);
}
}

@Override
public void commitJob(JobContext jobContext) throws IOException {
for (OutputCommitter committer : committerMap.values()) {
for (String output : getOutputPaths(jobContext)) {
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.commitJob(jobContext);
}
cleanupJob(jobContext);
}

@Override
public void cleanupJob(JobContext jobContext) throws IOException {
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
// delete the temporary directory that has partition information in text files.
fs.delete(new Path(outputPath, getPendingDirPath(jobContext.getJobID())), true);
}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
IOException ioe = null;

for (OutputCommitter committer : committerMap.values()) {
for (String output : getOutputPaths(taskAttemptContext)) {
try {
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.abortTask(taskAttemptContext);
} catch (IOException e) {
if (ioe == null) {
Expand All @@ -124,21 +140,108 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
IOException ioe = null;

for (OutputCommitter committer : committerMap.values()) {
try {
try {
for (String output : getOutputPaths(jobContext)) {
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.abortJob(jobContext, state);
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
} finally {
cleanupJob(jobContext);
}

if (ioe != null) {
throw ioe;
}
}

// return path lists based on JobContext configuration.
private Set<String> getOutputPaths(JobContext jobContext) throws IOException {
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
return getOutputPathsFromTempPartitionFile(outputPath, fs, null, jobContext.getJobID());
}

private Set<String> getOutputPaths(TaskAttemptContext taskAttemptContext) throws IOException {
Path outputPath = new Path(
taskAttemptContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(taskAttemptContext.getConfiguration());
return getOutputPathsFromTempPartitionFile(outputPath, fs,
taskAttemptContext.getTaskAttemptID().getTaskID().toString(),
taskAttemptContext.getJobID());
}

/**
* This method will return the full path up to path suffix after reading from partitions.txt file
* If method is getting called from task context, it will return paths from single file, otherwise all paths
*
* @param baseOutputPath
* @param fs
* @param taskId
* @param jobID
* @return
* @throws IOException
*/
private Set<String> getOutputPathsFromTempPartitionFile(Path baseOutputPath, FileSystem fs, @Nullable String taskId,
JobID jobID) throws IOException {
Set<String> outputPaths = new HashSet<>();
Path tempPath = taskId == null ? new Path(baseOutputPath, getPendingDirPath(jobID))
: new Path(baseOutputPath, String.format("%s/%s%s", getPendingDirPath(jobID), taskId,
PARTITIONS_FILE_SUFFIX));

if (!fs.exists(tempPath)) {
return outputPaths;
}

for (FileStatus status : fs.listStatus(tempPath)) {
if (status.getPath().getName().endsWith(PARTITIONS_FILE_SUFFIX)) {
try (FSDataInputStream dis = fs.open(status.getPath())) {
while (true) {
try {
outputPaths.add(dis.readUTF());
} catch (EOFException e) {
break;
}
}
}
}
}
return outputPaths;
}

/**
* This method will create a _temporary_{jobID} directory in base directory path and will create a file with name
* {taskid}_partitions.txt which will store the full path upto path suffix. e.g. gs://basepath/tablename/path_suffix
*
* @param path Split file path upto split field name
* @param context
* @throws IOException
*/
private void writePartitionFile(String path, TaskAttemptContext context) throws IOException {
Path outputPath = new Path(context.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
Path tempPath = new Path(outputPath, getPendingDirPath(context.getJobID()));
FileSystem fs = tempPath.getFileSystem(context.getConfiguration());
String taskId = context.getTaskAttemptID().getTaskID().toString();
Path taskPartitionFile = new Path(tempPath, String.format("%s%s", taskId, PARTITIONS_FILE_SUFFIX));
if (!fs.exists(taskPartitionFile)) {
fs.createNewFile(taskPartitionFile);
} else if (firstTable) {
fs.create(taskPartitionFile, true);
}
try (DataOutputStream out = fs.append(taskPartitionFile)) {
out.writeUTF(path);
}
}

// This will create a directory with name _temporary_{jobId} to write the partition files
// Job ID added as a suffix, so that multiple pipelines can write to same path in parallel.
private String getPendingDirPath(JobID jobId) {
return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -39,10 +38,8 @@ public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, Struct
public static final String DELEGATE_CLASS = "delegating_output_format.delegate";
public static final String OUTPUT_PATH_BASE_DIR = "delegating_output_format.output.path.base";
public static final String OUTPUT_PATH_SUFFIX = "delegating_output_format.output.path.suffix";
private final DelegatingGCSOutputCommitter outputCommitter;

public DelegatingGCSOutputFormat() {
this.outputCommitter = new DelegatingGCSOutputCommitter();
}

/**
Expand All @@ -65,7 +62,7 @@ public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptC
Configuration hConf = context.getConfiguration();
String partitionField = hConf.get(PARTITION_FIELD);

return new DelegatingGCSRecordWriter(context, partitionField, outputCommitter);
return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context));
}

@Override
Expand All @@ -74,8 +71,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
}

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return outputCommitter;
public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new DelegatingGCSOutputCommitter(context);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -34,7 +32,6 @@
* This Record Writer will initialize record writes and Output Committers as needed.
*/
public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(DelegatingGCSRecordWriter.class);
private final TaskAttemptContext context;
private final String partitionField;
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
Expand Down Expand Up @@ -63,7 +60,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());

//Initialize GCS Output Committer for this format.
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, context, tableName);
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);

//Add record writer to delegate map.
delegate = format.getRecordWriter(context);
Expand All @@ -79,12 +76,6 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
for (RecordWriter<NullWritable, StructuredRecord> delegate : delegateMap.values()) {
delegate.close(context);
}

// Call the Commit Task and Commit Job implementations of this plugin to copy files into their final directory.
// We need to do this at this stage because the OutputCommitter needs to be aware of the different partitions
// that have been stored so far.
delegatingGCSOutputCommitter.commitTask(context);
delegatingGCSOutputCommitter.commitJob(context);
}

}
Loading

0 comments on commit ec5bf50

Please sign in to comment.