diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index f61083f11a28..544e766ae0d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -66,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final CompactManager compactManager; private final boolean forceCompact; private final List newFiles; + private final List deletedFiles; private final List compactBefore; private final List compactAfter; private final LongCounter seqNumCounter; @@ -101,6 +102,7 @@ public AppendOnlyWriter( this.compactManager = compactManager; this.forceCompact = forceCompact; this.newFiles = new ArrayList<>(); + this.deletedFiles = new ArrayList<>(); this.compactBefore = new ArrayList<>(); this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); @@ -113,6 +115,7 @@ public AppendOnlyWriter( if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); + deletedFiles.addAll(increment.newFilesIncrement().deletedFiles()); compactBefore.addAll(increment.compactIncrement().compactBefore()); compactAfter.addAll(increment.compactIncrement().compactAfter()); } @@ -233,7 +236,10 @@ private void trySyncLatestCompaction(boolean blocking) private CommitIncrement drainIncrement() { NewFilesIncrement newFilesIncrement = - new NewFilesIncrement(new ArrayList<>(newFiles), Collections.emptyList()); + new NewFilesIncrement( + new ArrayList<>(newFiles), + new ArrayList<>(deletedFiles), + Collections.emptyList()); CompactIncrement compactIncrement = new CompactIncrement( new ArrayList<>(compactBefore), @@ -241,6 +247,7 @@ private CommitIncrement drainIncrement() { Collections.emptyList()); newFiles.clear(); + deletedFiles.clear(); compactBefore.clear(); compactAfter.clear(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java index 4e980bd31f58..b2f63070d131 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java @@ -27,21 +27,31 @@ public class NewFilesIncrement { private final List newFiles; + private final List deletedFiles; private final List changelogFiles; - public NewFilesIncrement(List newFiles, List changelogFiles) { + public NewFilesIncrement( + List newFiles, + List deletedFiles, + List changelogFiles) { this.newFiles = newFiles; + this.deletedFiles = deletedFiles; this.changelogFiles = changelogFiles; } public static NewFilesIncrement emptyIncrement() { - return new NewFilesIncrement(Collections.emptyList(), Collections.emptyList()); + return new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); } public List newFiles() { return newFiles; } + public List deletedFiles() { + return deletedFiles; + } + public List changelogFiles() { return changelogFiles; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 537d838d0cbb..1c94a2108561 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -69,6 +69,7 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { @Nullable private final FieldsComparator userDefinedSeqComparator; private final LinkedHashSet newFiles; + private final LinkedHashSet deletedFiles; private final LinkedHashSet newFilesChangelog; private final LinkedHashMap compactBefore; private final LinkedHashSet compactAfter; @@ -107,12 +108,14 @@ public MergeTreeWriter( this.userDefinedSeqComparator = userDefinedSeqComparator; this.newFiles = new LinkedHashSet<>(); + this.deletedFiles = new LinkedHashSet<>(); this.newFilesChangelog = new LinkedHashSet<>(); this.compactBefore = new LinkedHashMap<>(); this.compactAfter = new LinkedHashSet<>(); this.compactChangelog = new LinkedHashSet<>(); if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); + deletedFiles.addAll(increment.newFilesIncrement().deletedFiles()); newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles()); increment .compactIncrement() @@ -253,7 +256,9 @@ public void sync() throws Exception { private CommitIncrement drainIncrement() { NewFilesIncrement newFilesIncrement = new NewFilesIncrement( - new ArrayList<>(newFiles), new ArrayList<>(newFilesChangelog)); + new ArrayList<>(newFiles), + new ArrayList<>(deletedFiles), + new ArrayList<>(newFilesChangelog)); CompactIncrement compactIncrement = new CompactIncrement( new ArrayList<>(compactBefore.values()), @@ -261,6 +266,7 @@ private CommitIncrement drainIncrement() { new ArrayList<>(compactChangelog)); newFiles.clear(); + deletedFiles.clear(); newFilesChangelog.clear(); compactBefore.clear(); compactAfter.clear(); @@ -306,6 +312,7 @@ public void close() throws Exception { // delete temporary files List delete = new ArrayList<>(newFiles); newFiles.clear(); + deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { writerFactory.deleteFile(file.fileName(), file.level()); diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 2b4612229bc1..ddb11a658786 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -86,7 +86,8 @@ public static CommitMessage commitFile(BinaryRow partition, List d return new CommitMessageImpl( partition, 0, - new NewFilesIncrement(dataFileMetas, Collections.emptyList()), + new NewFilesIncrement( + dataFileMetas, Collections.emptyList(), Collections.emptyList()), new CompactIncrement( Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 5c6343956752..3f2ad87ac4a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -572,6 +572,13 @@ private void collectChanges( .newFilesIncrement() .newFiles() .forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .newFilesIncrement() + .deletedFiles() + .forEach( + m -> + appendTableFiles.add( + makeEntry(FileKind.DELETE, commitMessage, m))); commitMessage .newFilesIncrement() .changelogFiles() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index da70c766d6b1..65a647edb9ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -75,6 +75,7 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio serializeBinaryRow(obj.partition(), view); view.writeInt(obj.bucket()); dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view); + dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), view); dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), view); dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), view); dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), view); @@ -116,6 +117,7 @@ private CommitMessage deserialize(DataInputView view) throws IOException { deserializeBinaryRow(view), view.readInt(), new NewFilesIncrement( + dataFileSerializer.deserializeList(view), dataFileSerializer.deserializeList(view), dataFileSerializer.deserializeList(view)), new CompactIncrement( diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index 7bb11260a336..0ed83d3e0200 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -111,7 +111,8 @@ private CommitMessage createCommitMessage(BinaryRow partition, int bucket, Index return new CommitMessageImpl( partition, bucket, - new NewFilesIncrement(Collections.emptyList(), Collections.emptyList()), + new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new IndexIncrement(Collections.singletonList(file))); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index ee279c097a17..ae01b58323e7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -90,6 +90,7 @@ private static void addFileCommittables( public static NewFilesIncrement randomNewFilesIncrement() { return new NewFilesIncrement( + Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)), Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)), Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0))); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 0a3a8fae791c..26a7717325a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -302,6 +302,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( @@ -318,6 +319,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( @@ -333,6 +335,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( @@ -367,6 +370,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( @@ -383,6 +387,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement( @@ -398,6 +403,7 @@ public void testCommitInputEnd() throws Exception { BinaryRow.EMPTY_ROW, 0, new NewFilesIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(