Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] add deletedFiles in NewFilesIncrement #3117

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final CompactManager compactManager;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
Expand Down Expand Up @@ -101,6 +102,7 @@ public AppendOnlyWriter(
this.compactManager = compactManager;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
Expand All @@ -113,6 +115,7 @@ public AppendOnlyWriter(

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
Expand Down Expand Up @@ -233,14 +236,18 @@ private void trySyncLatestCompaction(boolean blocking)

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(new ArrayList<>(newFiles), Collections.emptyList());
new NewFilesIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
Collections.emptyList());
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore),
new ArrayList<>(compactAfter),
Collections.emptyList());

newFiles.clear();
deletedFiles.clear();
compactBefore.clear();
compactAfter.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,31 @@
public class NewFilesIncrement {

private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;

public NewFilesIncrement(List<DataFileMeta> newFiles, List<DataFileMeta> changelogFiles) {
public NewFilesIncrement(
List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles,
List<DataFileMeta> changelogFiles) {
this.newFiles = newFiles;
this.deletedFiles = deletedFiles;
this.changelogFiles = changelogFiles;
}

public static NewFilesIncrement emptyIncrement() {
return new NewFilesIncrement(Collections.emptyList(), Collections.emptyList());
return new NewFilesIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}

public List<DataFileMeta> newFiles() {
return newFiles;
}

public List<DataFileMeta> deletedFiles() {
return deletedFiles;
}

public List<DataFileMeta> changelogFiles() {
return changelogFiles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
@Nullable private final FieldsComparator userDefinedSeqComparator;

private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> deletedFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
private final LinkedHashMap<String, DataFileMeta> compactBefore;
private final LinkedHashSet<DataFileMeta> compactAfter;
Expand Down Expand Up @@ -107,12 +108,14 @@ public MergeTreeWriter(
this.userDefinedSeqComparator = userDefinedSeqComparator;

this.newFiles = new LinkedHashSet<>();
this.deletedFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
this.compactChangelog = new LinkedHashSet<>();
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
increment
.compactIncrement()
Expand Down Expand Up @@ -253,14 +256,17 @@ public void sync() throws Exception {
private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
new ArrayList<>(newFiles), new ArrayList<>(newFilesChangelog));
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter),
new ArrayList<>(compactChangelog));

newFiles.clear();
deletedFiles.clear();
newFilesChangelog.clear();
compactBefore.clear();
compactAfter.clear();
Expand Down Expand Up @@ -306,6 +312,7 @@ public void close() throws Exception {
// delete temporary files
List<DataFileMeta> delete = new ArrayList<>(newFiles);
newFiles.clear();
deletedFiles.clear();

for (DataFileMeta file : newFilesChangelog) {
writerFactory.deleteFile(file.fileName(), file.level());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> d
return new CommitMessageImpl(
partition,
0,
new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
new NewFilesIncrement(
dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,13 @@ private void collectChanges(
.newFilesIncrement()
.newFiles()
.forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m)));
commitMessage
.newFilesIncrement()
.deletedFiles()
.forEach(
m ->
appendTableFiles.add(
makeEntry(FileKind.DELETE, commitMessage, m)));
commitMessage
.newFilesIncrement()
.changelogFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio
serializeBinaryRow(obj.partition(), view);
view.writeInt(obj.bucket());
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), view);
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), view);
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), view);
Expand Down Expand Up @@ -116,6 +117,7 @@ private CommitMessage deserialize(DataInputView view) throws IOException {
deserializeBinaryRow(view),
view.readInt(),
new NewFilesIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
new CompactIncrement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private CommitMessage createCommitMessage(BinaryRow partition, int bucket, Index
return new CommitMessageImpl(
partition,
bucket,
new NewFilesIncrement(Collections.emptyList(), Collections.emptyList()),
new NewFilesIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new IndexIncrement(Collections.singletonList(file)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private static void addFileCommittables(

public static NewFilesIncrement randomNewFilesIncrement() {
return new NewFilesIncrement(
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -318,6 +319,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -333,6 +335,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand Down Expand Up @@ -367,6 +370,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -383,6 +387,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand All @@ -398,6 +403,7 @@ public void testCommitInputEnd() throws Exception {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Expand Down
Loading