Skip to content

Commit

Permalink
[core] add deletedFiles in NewFilesIncrement (apache#3117)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored and zhu3pang committed Mar 29, 2024
1 parent 9a89253 commit a9ff40a
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final CompactManager compactManager;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
Expand Down Expand Up @@ -101,6 +102,7 @@ public AppendOnlyWriter(
this.compactManager = compactManager;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
Expand All @@ -113,6 +115,7 @@ public AppendOnlyWriter(

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
Expand Down Expand Up @@ -233,14 +236,18 @@ private void trySyncLatestCompaction(boolean blocking)

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(new ArrayList<>(newFiles), Collections.emptyList());
new NewFilesIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
Collections.emptyList());
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore),
new ArrayList<>(compactAfter),
Collections.emptyList());

newFiles.clear();
deletedFiles.clear();
compactBefore.clear();
compactAfter.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,31 @@
public class NewFilesIncrement {

private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;

public NewFilesIncrement(List<DataFileMeta> newFiles, List<DataFileMeta> changelogFiles) {
public NewFilesIncrement(
List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles,
List<DataFileMeta> changelogFiles) {
this.newFiles = newFiles;
this.deletedFiles = deletedFiles;
this.changelogFiles = changelogFiles;
}

public static NewFilesIncrement emptyIncrement() {
return new NewFilesIncrement(Collections.emptyList(), Collections.emptyList());
return new NewFilesIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}

public List<DataFileMeta> newFiles() {
return newFiles;
}

public List<DataFileMeta> deletedFiles() {
return deletedFiles;
}

public List<DataFileMeta> changelogFiles() {
return changelogFiles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
@Nullable private final FieldsComparator userDefinedSeqComparator;

private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> deletedFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
private final LinkedHashMap<String, DataFileMeta> compactBefore;
private final LinkedHashSet<DataFileMeta> compactAfter;
Expand Down Expand Up @@ -107,12 +108,14 @@ public MergeTreeWriter(
this.userDefinedSeqComparator = userDefinedSeqComparator;

this.newFiles = new LinkedHashSet<>();
this.deletedFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
this.compactChangelog = new LinkedHashSet<>();
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
increment
.compactIncrement()
Expand Down Expand Up @@ -253,14 +256,17 @@ public void sync() throws Exception {
private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
new ArrayList<>(newFiles), new ArrayList<>(newFilesChangelog));
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter),
new ArrayList<>(compactChangelog));

newFiles.clear();
deletedFiles.clear();
newFilesChangelog.clear();
compactBefore.clear();
compactAfter.clear();
Expand Down Expand Up @@ -306,6 +312,7 @@ public void close() throws Exception {
// delete temporary files
List<DataFileMeta> delete = new ArrayList<>(newFiles);
newFiles.clear();
deletedFiles.clear();

for (DataFileMeta file : newFilesChangelog) {
writerFactory.deleteFile(file.fileName(), file.level());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> d
return new CommitMessageImpl(
partition,
0,
new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
new NewFilesIncrement(
dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,13 @@ private void collectChanges(
.newFilesIncrement()
.newFiles()
.forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m)));
commitMessage
.newFilesIncrement()
.deletedFiles()
.forEach(
m ->
appendTableFiles.add(
makeEntry(FileKind.DELETE, commitMessage, m)));
commitMessage
.newFilesIncrement()
.changelogFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio
serializeBinaryRow(obj.partition(), view);
view.writeInt(obj.bucket());
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), view);
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), view);
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), view);
Expand Down Expand Up @@ -116,6 +117,7 @@ private CommitMessage deserialize(DataInputView view) throws IOException {
deserializeBinaryRow(view),
view.readInt(),
new NewFilesIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
new CompactIncrement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private CommitMessage createCommitMessage(BinaryRow partition, int bucket, Index
return new CommitMessageImpl(
partition,
bucket,
new NewFilesIncrement(Collections.emptyList(), Collections.emptyList()),
new NewFilesIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new IndexIncrement(Collections.singletonList(file)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private static void addFileCommittables(

public static NewFilesIncrement randomNewFilesIncrement() {
return new NewFilesIncrement(
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -318,6 +319,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -333,6 +335,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand Down Expand Up @@ -367,6 +370,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -383,6 +387,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -398,6 +403,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand Down

0 comments on commit a9ff40a

Please sign in to comment.