Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed May 3, 2024
1 parent 2b5105c commit f7d2b2e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -44,10 +44,12 @@
public class DelegatingGCSOutputCommitter extends OutputCommitter {

private TaskAttemptContext taskAttemptContext;
private final Map<String, OutputCommitter> committerMap;
private static final String PARTITIONS_FILE_SUFFIX = "_partitions.txt";


public DelegatingGCSOutputCommitter() {
committerMap = new HashMap<>();
}

// Setting Task Context to committer to use at the time of commit job to get task details
Expand All @@ -74,6 +76,7 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
gcsOutputCommitter.setupJob(context);
gcsOutputCommitter.setupTask(context);
writePartitionFile(context.getConfiguration().get(FileOutputFormat.OUTDIR), context);
committerMap.put(tableName, gcsOutputCommitter);
}

@Override
Expand All @@ -92,24 +95,29 @@ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
return true;
if (committerMap.isEmpty()) {
return false;
}

boolean needsTaskCommit = true;

for (OutputCommitter committer : committerMap.values()) {
needsTaskCommit = needsTaskCommit && committer.needsTaskCommit(taskAttemptContext);
}

return needsTaskCommit;
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
for (String output : getOutputPaths(taskAttemptContext)) {
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
for (OutputCommitter committer : committerMap.values()) {
committer.commitTask(taskAttemptContext);
}
}

@Override
public void commitJob(JobContext jobContext) throws IOException {
for (String output : getOutputPaths(jobContext)) {
jobContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
// todo this taskAttemptContext is coming from Driver class with mapper context and is same for all output paths,
// even though it is working fine still needs to check how it works
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.commitJob(jobContext);
}
Expand All @@ -127,10 +135,8 @@ public void cleanupJob(JobContext jobContext) throws IOException {
@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
IOException ioe = null;
for (String output : getOutputPaths(taskAttemptContext)) {
for (OutputCommitter committer : committerMap.values()) {
try {
taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, output);
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), taskAttemptContext);
committer.abortTask(taskAttemptContext);
} catch (IOException e) {
if (ioe == null) {
Expand Down Expand Up @@ -161,9 +167,10 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce
} else {
ioe.addSuppressed(e);
}
} finally {
cleanupJob(jobContext);
}
}
cleanupJob(jobContext);
if (ioe != null) {
throw ioe;
}
Expand All @@ -176,13 +183,6 @@ private Set<String> getOutputPaths(JobContext jobContext) throws IOException {
return getOutputPathsFromTempPartitionFile(outputPath, fs);
}

// return path lists based on TaskAttemptContext configuration.
private Set<String> getOutputPaths(TaskAttemptContext context) throws IOException {
Path outputPath = new Path(context.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
return getOutputPathsFromTempPartitionFile(outputPath, fs);
}

/**
* This method will return the full path up to path suffix after reading from partitions.txt file
*
Expand All @@ -198,8 +198,7 @@ private Set<String> getOutputPathsFromTempPartitionFile(Path baseOutputPath, Fil
for (FileStatus status : fs.listStatus(tempPath)) {
if (status.getPath().getName().endsWith(PARTITIONS_FILE_SUFFIX)) {
try (FSDataInputStream dis = fs.open(status.getPath());
DataInputStream in = new DataInputStream(new BufferedInputStream(dis));
BufferedReader br = new BufferedReader(new java.io.InputStreamReader(in))) {
BufferedReader br = new BufferedReader(new java.io.InputStreamReader(dis))) {
String line;
while ((line = br.readLine()) != null) {
outputPaths.add(line);
Expand All @@ -226,6 +225,8 @@ private void writePartitionFile(String path, TaskAttemptContext context) throws
Path taskPartitionFile = new Path(tempPath, String.format("%s%s", taskId, PARTITIONS_FILE_SUFFIX));
if (!fs.exists(taskPartitionFile)) {
fs.createNewFile(taskPartitionFile);
} else if (committerMap.isEmpty()) {
fs.create(taskPartitionFile, true);
}
try (DataOutputStream out = fs.append(taskPartitionFile)) {
out.writeBytes(path + "\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static Map<String, String> configure(String delegateClassName,
public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext context) {
Configuration hConf = context.getConfiguration();
String partitionField = hConf.get(PARTITION_FIELD);

return new DelegatingGCSRecordWriter(context, partitionField, outputCommitter);
}

Expand Down

0 comments on commit f7d2b2e

Please sign in to comment.