Skip to content

Commit

Permalink
[followup] comments
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed May 29, 2024
1 parent 5b1c7ed commit d1b0dca
Showing 1 changed file with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.Preconditions;

import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -46,10 +48,6 @@ public class DeletionVectorIndexFileWriter {
private final boolean isWrittenToMulitFiles;
private final long targetSizeInBytes;

private boolean written = false;

private long writtenSizeInBytes = 0L;

public DeletionVectorIndexFileWriter(
FileIO fileIO,
PathFactory pathFactory,
Expand All @@ -63,35 +61,35 @@ public DeletionVectorIndexFileWriter(

public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws IOException {
List<IndexFileMeta> result = new ArrayList<>();
SingleIndexFileWriter writer = createWriter();

for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
String dataFile = entry.getKey();
byte[] valueBytes = entry.getValue().serializeToBytes();
int currentSize = valueBytes.length;

if (isWrittenToMulitFiles
&& written
&& writtenSizeInBytes + currentSize > targetSizeInBytes) {
result.add(writer.closeWriter());
writer = createWriter();
}

writer.write(dataFile, valueBytes);
written = true;
writtenSizeInBytes += currentSize;
Iterator<Map.Entry<String, DeletionVector>> iterator = input.entrySet().iterator();
while (iterator.hasNext()) {
result.add(tryWriter(iterator));
}
result.add(writer.closeWriter());
return result;
}

private SingleIndexFileWriter createWriter() throws IOException {
written = false;
writtenSizeInBytes = 0L;
return new SingleIndexFileWriter(fileIO, indexPathFactory.newPath());
private IndexFileMeta tryWriter(Iterator<Map.Entry<String, DeletionVector>> iterator)
throws IOException {
SingleIndexFileWriter writer =
new SingleIndexFileWriter(fileIO, indexPathFactory.newPath());
try {
while (iterator.hasNext()) {
Map.Entry<String, DeletionVector> entry = iterator.next();
long currentSize = writer.write(entry.getKey(), entry.getValue());

if (isWrittenToMulitFiles
&& !writer.hasWritten()
&& writer.writtenSizeInBytes() + currentSize > targetSizeInBytes) {
break;
}
}
} finally {
writer.close();
}
return writer.writtenIndexFile();
}

static class SingleIndexFileWriter {
class SingleIndexFileWriter implements Closeable {
private final FileIO fileIO;

private final Path path;
Expand All @@ -100,6 +98,8 @@ static class SingleIndexFileWriter {

private final LinkedHashMap<String, Pair<Integer, Integer>> dvRanges;

private long writtenSizeInBytes = 0L;

public SingleIndexFileWriter(FileIO fileIO, Path path) throws IOException {
this.fileIO = fileIO;
this.path = path;
Expand All @@ -108,25 +108,39 @@ public SingleIndexFileWriter(FileIO fileIO, Path path) throws IOException {
this.dvRanges = new LinkedHashMap<>();
}

public long write(String key, byte[] data) throws IOException {
public boolean hasWritten() {
return dvRanges.isEmpty();
}

public long writtenSizeInBytes() {
return this.writtenSizeInBytes;
}

public long write(String key, DeletionVector deletionVector) throws IOException {
Preconditions.checkNotNull(dataOutputStream);
byte[] data = deletionVector.serializeToBytes();
int size = data.length;

dvRanges.put(key, Pair.of(dataOutputStream.size(), size));
dataOutputStream.writeInt(size);
dataOutputStream.write(data);
dataOutputStream.writeInt(calculateChecksum(data));
writtenSizeInBytes += size;
return size;
}

public IndexFileMeta closeWriter() throws IOException {
dataOutputStream.close();
public IndexFileMeta writtenIndexFile() throws IOException {
return new IndexFileMeta(
DELETION_VECTORS_INDEX,
path.getName(),
fileIO.getFileSize(path),
dvRanges.size(),
dvRanges);
}

@Override
public void close() throws IOException {
dataOutputStream.close();
}
}
}

0 comments on commit d1b0dca

Please sign in to comment.