Skip to content

Commit

Permalink
[HUDI-1804] Continue to write when Flink write task restart because o…
Browse files Browse the repository at this point in the history
…f container killing (apache#2843)

The `FlinkMergeHande` creates a marker file under the metadata path
each time it initializes, when a write task restarts from killing, it
tries to create the existing file and reports error.

To solve this problem, skip the creation and use the original data file
as base file to merge.
  • Loading branch information
danny0405 authored Apr 19, 2021
1 parent f7b6b68 commit dab5114
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,56 @@ private String stripMarkerFolderPrefix(String fullMarkerPath) {
* The marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
*/
public Path create(String partitionPath, String dataFileName, IOType type) {
Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
try {
if (!fs.exists(path)) {
fs.mkdirs(path); // create a new partition as needed.
}
LOG.info("Creating Marker Path=" + markerPath);
fs.create(markerPath, false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
throw new HoodieException("Failed to create marker file " + markerPath, e);
}
String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name());
Path markerPath = new Path(path, markerFileName);
return markerPath;
}

/**
* The marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
*
* @return true if the marker file creates successfully,
* false if it already exists
*/
public boolean createIfNotExists(String partitionPath, String dataFileName, IOType type) {
Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
try {
if (fs.exists(markerPath)) {
LOG.warn("Marker Path=" + markerPath + " already exists, cancel creation");
return false;
}
LOG.info("Creating Marker Path=" + markerPath);
fs.create(markerPath, false).close();
} catch (IOException e) {
throw new HoodieException("Failed to create marker file " + markerPath, e);
}
return markerPath;
return true;
}

/**
* Returns the marker path. Would create the partition path first if not exists.
*
* @param partitionPath The partition path
* @param dataFileName The data file name
* @param type The IO type
* @return path of the marker file
*/
private Path getMarkerPath(String partitionPath, String dataFileName, IOType type) {
Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
try {
if (!fs.exists(path)) {
fs.mkdirs(path); // create a new partition as needed.
}
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name());
return new Path(path, markerFileName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;

import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
Expand All @@ -35,7 +36,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
Expand Down Expand Up @@ -68,13 +68,23 @@ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTab
taskContextSupplier);
}

/**
* Called by the compactor code path.
*/
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier);
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
if (!created) {
// If the marker file already exists, that means the write task
// was pulled up again with same data file name, removes the legacy
// data file first.
try {
if (fs.exists(path)) {
fs.delete(path, false);
LOG.warn("Legacy data file: " + path + " delete success");
}
} catch (IOException e) {
throw new HoodieException("Error while deleting legacy data file: " + path, e);
}
}
}

/**
Expand Down Expand Up @@ -109,7 +119,7 @@ protected long computeTotalWriteBytes() throws IOException {
}

@Override
protected long computeFileSizeInBytes() throws IOException {
protected long computeFileSizeInBytes() {
return fileWriter.getBytesWritten();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -70,13 +71,16 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
/**
* Records the rolled over file paths.
*/
private final List<Path> rolloverPaths;
private List<Path> rolloverPaths;

public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
rolloverPaths = new ArrayList<>();
if (rolloverPaths == null) {
// #createMarkerFile may already initialize it already
rolloverPaths = new ArrayList<>();
}
}

/**
Expand Down Expand Up @@ -104,6 +108,25 @@ boolean needsUpdateLocation() {
return false;
}

@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
if (!created) {
// If the marker file already exists, that means the write task
// was pulled up again with same data file name, performs rolling over action here:
// use the new file path as the base file path (file1),
// and generates new file path with roll over number (file2).
// the incremental data set would merge into the file2 instead of file1.
//
// When the task do finalization in #finishWrite, the intermediate files would be cleaned.
oldFilePath = newFilePath;
rolloverPaths = new ArrayList<>();
rolloverPaths.add(oldFilePath);
newFilePath = makeNewFilePathWithRollover();
}
}

/**
*
* Rolls over the write handle to prepare for the next batch write.
Expand Down Expand Up @@ -132,11 +155,7 @@ public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {

rolloverPaths.add(newFilePath);
oldFilePath = newFilePath;
// Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
String newFileName = generatesDataFileNameWithRollover();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
newFilePath = makeNewFilePathWithRollover();

try {
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
Expand All @@ -148,6 +167,16 @@ public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
newFilePath.toString()));
}

/**
* Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
*/
private Path makeNewFilePathWithRollover() {
String newFileName = generatesDataFileNameWithRollover();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString();
return new Path(config.getBasePath(), relativePath);
}

public void finishWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);
Expand Down

0 comments on commit dab5114

Please sign in to comment.