diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java index 40be741025e8..d777db40f320 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -189,23 +189,56 @@ private String stripMarkerFolderPrefix(String fullMarkerPath) { * The marker path will be /.hoodie/.temp//2019/04/25/filename.marker.writeIOType. */ public Path create(String partitionPath, String dataFileName, IOType type) { - Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath); + Path markerPath = getMarkerPath(partitionPath, dataFileName, type); try { - if (!fs.exists(path)) { - fs.mkdirs(path); // create a new partition as needed. - } + LOG.info("Creating Marker Path=" + markerPath); + fs.create(markerPath, false).close(); } catch (IOException e) { - throw new HoodieIOException("Failed to make dir " + path, e); + throw new HoodieException("Failed to create marker file " + markerPath, e); } - String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name()); - Path markerPath = new Path(path, markerFileName); + return markerPath; + } + + /** + * The marker path will be /.hoodie/.temp//2019/04/25/filename.marker.writeIOType. + * + * @return true if the marker file creates successfully, + * false if it already exists + */ + public boolean createIfNotExists(String partitionPath, String dataFileName, IOType type) { + Path markerPath = getMarkerPath(partitionPath, dataFileName, type); try { + if (fs.exists(markerPath)) { + LOG.warn("Marker Path=" + markerPath + " already exists, cancel creation"); + return false; + } LOG.info("Creating Marker Path=" + markerPath); fs.create(markerPath, false).close(); } catch (IOException e) { throw new HoodieException("Failed to create marker file " + markerPath, e); } - return markerPath; + return true; + } + + /** + * Returns the marker path. Would create the partition path first if not exists. + * + * @param partitionPath The partition path + * @param dataFileName The data file name + * @param type The IO type + * @return path of the marker file + */ + private Path getMarkerPath(String partitionPath, String dataFileName, IOType type) { + Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath); + try { + if (!fs.exists(path)) { + fs.mkdirs(path); // create a new partition as needed. + } + } catch (IOException e) { + throw new HoodieIOException("Failed to make dir " + path, e); + } + String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name()); + return new Path(path, markerFileName); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 2abefa91f1d9..556d98e35e0a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -20,13 +20,14 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; import org.apache.log4j.LogManager; @@ -35,7 +36,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Map; /** * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches). @@ -68,13 +68,23 @@ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTab taskContextSupplier); } - /** - * Called by the compactor code path. - */ - public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - String partitionPath, String fileId, Map> recordMap, - TaskContextSupplier taskContextSupplier) { - super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier); + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + if (!created) { + // If the marker file already exists, that means the write task + // was pulled up again with same data file name, removes the legacy + // data file first. + try { + if (fs.exists(path)) { + fs.delete(path, false); + LOG.warn("Legacy data file: " + path + " delete success"); + } + } catch (IOException e) { + throw new HoodieException("Error while deleting legacy data file: " + path, e); + } + } } /** @@ -109,7 +119,7 @@ protected long computeTotalWriteBytes() throws IOException { } @Override - protected long computeFileSizeInBytes() throws IOException { + protected long computeFileSizeInBytes() { return fileWriter.getBytesWritten(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 244c56bc0092..4da64049bafb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -70,13 +71,16 @@ public class FlinkMergeHandle /** * Records the rolled over file paths. */ - private final List rolloverPaths; + private List rolloverPaths; public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); - rolloverPaths = new ArrayList<>(); + if (rolloverPaths == null) { + // #createMarkerFile may already initialize it already + rolloverPaths = new ArrayList<>(); + } } /** @@ -104,6 +108,25 @@ boolean needsUpdateLocation() { return false; } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + if (!created) { + // If the marker file already exists, that means the write task + // was pulled up again with same data file name, performs rolling over action here: + // use the new file path as the base file path (file1), + // and generates new file path with roll over number (file2). + // the incremental data set would merge into the file2 instead of file1. + // + // When the task do finalization in #finishWrite, the intermediate files would be cleaned. + oldFilePath = newFilePath; + rolloverPaths = new ArrayList<>(); + rolloverPaths.add(oldFilePath); + newFilePath = makeNewFilePathWithRollover(); + } + } + /** * * Rolls over the write handle to prepare for the next batch write. @@ -132,11 +155,7 @@ public void rollOver(Iterator> newRecordsItr) { rolloverPaths.add(newFilePath); oldFilePath = newFilePath; - // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. - String newFileName = generatesDataFileNameWithRollover(); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") - + newFileName).toString(); - newFilePath = new Path(config.getBasePath(), relativePath); + newFilePath = makeNewFilePathWithRollover(); try { fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); @@ -148,6 +167,16 @@ public void rollOver(Iterator> newRecordsItr) { newFilePath.toString())); } + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + private Path makeNewFilePathWithRollover() { + String newFileName = generatesDataFileNameWithRollover(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + return new Path(config.getBasePath(), relativePath); + } + public void finishWrite() { // The file visibility should be kept by the configured ConsistencyGuard instance. rolloverPaths.add(newFilePath);