diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java index 1ed4ea39f4e5..7abe7743cff9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java @@ -27,9 +27,11 @@ import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.Preconditions; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,10 +48,6 @@ public class DeletionVectorIndexFileWriter { private final boolean isWrittenToMulitFiles; private final long targetSizeInBytes; - private boolean written = false; - - private long writtenSizeInBytes = 0L; - public DeletionVectorIndexFileWriter( FileIO fileIO, PathFactory pathFactory, @@ -63,35 +61,35 @@ public DeletionVectorIndexFileWriter( public List write(Map input) throws IOException { List result = new ArrayList<>(); - SingleIndexFileWriter writer = createWriter(); - - for (Map.Entry entry : input.entrySet()) { - String dataFile = entry.getKey(); - byte[] valueBytes = entry.getValue().serializeToBytes(); - int currentSize = valueBytes.length; - - if (isWrittenToMulitFiles - && written - && writtenSizeInBytes + currentSize > targetSizeInBytes) { - result.add(writer.closeWriter()); - writer = createWriter(); - } - - writer.write(dataFile, valueBytes); - written = true; - writtenSizeInBytes += currentSize; + Iterator> iterator = input.entrySet().iterator(); + while (iterator.hasNext()) { + result.add(tryWriter(iterator)); } - result.add(writer.closeWriter()); return result; } - private SingleIndexFileWriter createWriter() throws IOException { - written = false; - writtenSizeInBytes = 0L; - return new SingleIndexFileWriter(fileIO, indexPathFactory.newPath()); + private IndexFileMeta tryWriter(Iterator> iterator) + throws IOException { + SingleIndexFileWriter writer = + new SingleIndexFileWriter(fileIO, indexPathFactory.newPath()); + try { + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + long currentSize = writer.write(entry.getKey(), entry.getValue()); + + if (isWrittenToMulitFiles + && !writer.hasWritten() + && writer.writtenSizeInBytes() + currentSize > targetSizeInBytes) { + break; + } + } + } finally { + writer.close(); + } + return writer.writtenIndexFile(); } - static class SingleIndexFileWriter { + class SingleIndexFileWriter implements Closeable { private final FileIO fileIO; private final Path path; @@ -100,6 +98,8 @@ static class SingleIndexFileWriter { private final LinkedHashMap> dvRanges; + private long writtenSizeInBytes = 0L; + public SingleIndexFileWriter(FileIO fileIO, Path path) throws IOException { this.fileIO = fileIO; this.path = path; @@ -108,19 +108,28 @@ public SingleIndexFileWriter(FileIO fileIO, Path path) throws IOException { this.dvRanges = new LinkedHashMap<>(); } - public long write(String key, byte[] data) throws IOException { + public boolean hasWritten() { + return dvRanges.isEmpty(); + } + + public long writtenSizeInBytes() { + return this.writtenSizeInBytes; + } + + public long write(String key, DeletionVector deletionVector) throws IOException { Preconditions.checkNotNull(dataOutputStream); + byte[] data = deletionVector.serializeToBytes(); int size = data.length; dvRanges.put(key, Pair.of(dataOutputStream.size(), size)); dataOutputStream.writeInt(size); dataOutputStream.write(data); dataOutputStream.writeInt(calculateChecksum(data)); + writtenSizeInBytes += size; return size; } - public IndexFileMeta closeWriter() throws IOException { - dataOutputStream.close(); + public IndexFileMeta writtenIndexFile() throws IOException { return new IndexFileMeta( DELETION_VECTORS_INDEX, path.getName(), @@ -128,5 +137,10 @@ public IndexFileMeta closeWriter() throws IOException { dvRanges.size(), dvRanges); } + + @Override + public void close() throws IOException { + dataOutputStream.close(); + } } }