diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 98e064451509..c73a92062b80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -63,7 +63,6 @@ import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn; import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkState; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** Default implementation of {@link FileStoreScan}. */ @@ -81,7 +80,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; private BiFilter totalAwareBucketFilter = null; - private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; private Filter manifestEntryFilter = null; @@ -161,25 +159,16 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { @Override public FileStoreScan withSnapshot(long snapshotId) { - checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); this.specifiedSnapshot = snapshotManager.snapshot(snapshotId); return this; } @Override public FileStoreScan withSnapshot(Snapshot snapshot) { - checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); this.specifiedSnapshot = snapshot; return this; } - @Override - public FileStoreScan withManifestList(List manifests) { - checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests."); - this.specifiedManifests = manifests; - return this; - } - @Override public FileStoreScan withKind(ScanMode scanMode) { this.scanMode = scanMode; @@ -401,10 +390,6 @@ private Iterator readAndNoMergeFileEntries( } private ManifestsReader.Result readManifests() { - if (specifiedManifests != null) { - return new ManifestsReader.Result(null, specifiedManifests, specifiedManifests); - } - return manifestsReader.read(specifiedSnapshot, scanMode); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 7663f48229c6..179d16de6cd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -67,8 +67,6 @@ public interface FileStoreScan { FileStoreScan withSnapshot(Snapshot snapshot); - FileStoreScan withManifestList(List manifests); - FileStoreScan withKind(ScanMode scanMode); FileStoreScan withLevelFilter(Filter levelFilter); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 5218a515a337..0d8ea5f4a49a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -26,10 +26,12 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.memory.HeapMemorySegmentPool; @@ -38,7 +40,6 @@ import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreCommitImpl; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.SplitRead; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.options.MemorySize; @@ -561,29 +562,41 @@ public Set getFilesInUse(long snapshotId) { return getFilesInUse( snapshotId, snapshotManager(), - newScan(), fileIO, pathFactory(), - manifestListFactory().create()); + manifestListFactory().create(), + manifestFileFactory().create()); } public static Set getFilesInUse( long snapshotId, SnapshotManager snapshotManager, - FileStoreScan scan, FileIO fileIO, FileStorePathFactory pathFactory, - ManifestList manifestList) { + ManifestList manifestList, + ManifestFile manifestFile) { Set result = new HashSet<>(); if (snapshotManager.snapshotExists(snapshotId)) { - result.addAll( + Set files = getSnapshotFileInUse( - snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList)); + snapshotId, + snapshotManager, + fileIO, + pathFactory, + manifestList, + manifestFile); + result.addAll(files); } else if (snapshotManager.longLivedChangelogExists(snapshotId)) { - result.addAll( + Set files = getChangelogFileInUse( - snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList)); + snapshotId, + snapshotManager, + fileIO, + pathFactory, + manifestList, + manifestFile); + result.addAll(files); } else { throw new RuntimeException( String.format("The snapshot %s does not exist.", snapshotId)); @@ -595,10 +608,10 @@ public static Set getFilesInUse( private static Set getSnapshotFileInUse( long snapshotId, SnapshotManager snapshotManager, - FileStoreScan scan, FileIO fileIO, FileStorePathFactory pathFactory, - ManifestList manifestList) { + ManifestList manifestList, + ManifestFile manifestFile) { Set result = new HashSet<>(); SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); @@ -625,7 +638,11 @@ private static Set getSnapshotFileInUse( manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); // data file - List entries = scan.withManifestList(manifests).plan().files(); + List entries = + manifests.stream() + .flatMap(m -> manifestFile.read(m.fileName()).stream()) + .collect(Collectors.toList()); + entries = new ArrayList<>(FileEntry.mergeEntries(entries)); for (ManifestEntry entry : entries) { result.add( new Path( @@ -641,7 +658,9 @@ private static Set getSnapshotFileInUse( // use list. if (changelogDecoupled && !produceChangelog) { entries = - scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files(); + manifestList.readDeltaManifests(snapshot).stream() + .flatMap(m -> manifestFile.read(m.fileName()).stream()) + .collect(Collectors.toList()); for (ManifestEntry entry : entries) { // append delete file are delayed to delete if (entry.kind() == FileKind.DELETE @@ -661,15 +680,13 @@ private static Set getSnapshotFileInUse( private static Set getChangelogFileInUse( long changelogId, SnapshotManager snapshotManager, - FileStoreScan scan, FileIO fileIO, FileStorePathFactory pathFactory, - ManifestList manifestList) { + ManifestList manifestList, + ManifestFile manifestFile) { Set result = new HashSet<>(); SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); - boolean produceChangelog = - options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId); Changelog changelog = Changelog.fromPath(fileIO, changelogPath); @@ -677,35 +694,27 @@ private static Set getChangelogFileInUse( // changelog file result.add(changelogPath); - // manifest lists - if (!produceChangelog) { - result.add(pathFactory.toManifestListPath(changelog.baseManifestList())); - result.add(pathFactory.toManifestListPath(changelog.deltaManifestList())); - } - if (changelog.changelogManifestList() != null) { - result.add(pathFactory.toManifestListPath(changelog.changelogManifestList())); - } - - // manifests - List manifests = - new ArrayList<>(manifestList.readChangelogManifests(changelog)); - if (!produceChangelog) { - manifests.addAll(manifestList.readDataManifests(changelog)); - } - - manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); - // data file // not all manifests contains useful data file // (1) produceChangelog = 'true': data file in changelog manifests // (2) produceChangelog = 'false': 'APPEND' data file in delta manifests // delta file - if (!produceChangelog) { - for (ManifestEntry entry : - scan.withManifestList(manifestList.readDeltaManifests(changelog)) - .plan() - .files()) { + if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE) { + // TODO why we need to keep base manifests? + result.add(pathFactory.toManifestListPath(changelog.baseManifestList())); + manifestList + .readDataManifests(changelog) + .forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + + result.add(pathFactory.toManifestListPath(changelog.deltaManifestList())); + List manifests = manifestList.readDeltaManifests(changelog); + manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + List files = + manifests.stream() + .flatMap(m -> manifestFile.read(m.fileName()).stream()) + .collect(Collectors.toList()); + for (ManifestEntry entry : files) { if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { result.add( new Path( @@ -713,12 +722,15 @@ private static Set getChangelogFileInUse( entry.file().fileName())); } } - } else { - // changelog - for (ManifestEntry entry : - scan.withManifestList(manifestList.readChangelogManifests(changelog)) - .plan() - .files()) { + } else if (changelog.changelogManifestList() != null) { + result.add(pathFactory.toManifestListPath(changelog.changelogManifestList())); + List manifests = manifestList.readChangelogManifests(changelog); + manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + List files = + manifests.stream() + .flatMap(m -> manifestFile.read(m.fileName()).stream()) + .collect(Collectors.toList()); + for (ManifestEntry entry : files) { result.add( new Path( pathFactory.bucketPath(entry.partition(), entry.bucket()), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 739d4b6bd6b3..96dce3d78426 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -44,6 +44,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -451,7 +452,7 @@ public void testExpireWithUpgradedFile() throws Exception { store.assertCleaned(); } - @Test + @RepeatedTest(5) public void testChangelogOutLivedSnapshot() throws Exception { List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 2fd8c10cd944..4f3d5c1c24dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -26,8 +26,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.manifest.ManifestFileMeta; -import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -252,29 +250,6 @@ public void testWithSnapshot() throws Exception { runTestExactMatch(scan, wantedSnapshot, expected); } - @Test - public void testWithManifestList() throws Exception { - ThreadLocalRandom random = ThreadLocalRandom.current(); - int numCommits = random.nextInt(10) + 1; - for (int i = 0; i < numCommits; i++) { - List data = generateData(random.nextInt(100) + 1); - writeData(data); - } - - ManifestList manifestList = store.manifestListFactory().create(); - long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1; - Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId); - List wantedManifests = manifestList.readDataManifests(wantedSnapshot); - - FileStoreScan scan = store.newScan(); - scan.withManifestList(wantedManifests); - - List expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId); - gen.sort(expectedKvs); - Map expected = store.toKvMap(expectedKvs); - runTestExactMatch(scan, null, expected); - } - @Test public void testDropStatsInPlan() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index f6343bfe437f..4d8408955d38 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1473,10 +1473,10 @@ public void testAsyncExpireExecutionMode() throws Exception { TestFileStore.getFilesInUse( latestSnapshotId, snapshotManager, - store.newScan(), table.fileIO(), store.pathFactory(), - store.manifestListFactory().create()); + store.manifestListFactory().create(), + store.manifestFileFactory().create()); List unusedFileList = Files.walk(Paths.get(tempDir.toString()))