Skip to content

Commit

Permalink
[core] Remove FileStoreScan.withManifestList and fix unstable test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 20, 2024
1 parent 06fbb5e commit da4d64b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/** Default implementation of {@link FileStoreScan}. */
Expand All @@ -81,7 +80,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
Expand Down Expand Up @@ -161,25 +159,16 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {

@Override
public FileStoreScan withSnapshot(long snapshotId) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
return this;
}

@Override
public FileStoreScan withSnapshot(Snapshot snapshot) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshot;
return this;
}

@Override
public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests.");
this.specifiedManifests = manifests;
return this;
}

@Override
public FileStoreScan withKind(ScanMode scanMode) {
this.scanMode = scanMode;
Expand Down Expand Up @@ -401,10 +390,6 @@ private <T extends FileEntry> Iterator<T> readAndNoMergeFileEntries(
}

private ManifestsReader.Result readManifests() {
if (specifiedManifests != null) {
return new ManifestsReader.Result(null, specifiedManifests, specifiedManifests);
}

return manifestsReader.read(specifiedSnapshot, scanMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public interface FileStoreScan {

FileStoreScan withSnapshot(Snapshot snapshot);

FileStoreScan withManifestList(List<ManifestFileMeta> manifests);

FileStoreScan withKind(ScanMode scanMode);

FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
Expand Down
104 changes: 58 additions & 46 deletions paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.memory.HeapMemorySegmentPool;
Expand All @@ -38,7 +40,6 @@
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -561,29 +562,41 @@ public Set<Path> getFilesInUse(long snapshotId) {
return getFilesInUse(
snapshotId,
snapshotManager(),
newScan(),
fileIO,
pathFactory(),
manifestListFactory().create());
manifestListFactory().create(),
manifestFileFactory().create());
}

public static Set<Path> getFilesInUse(
long snapshotId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();

if (snapshotManager.snapshotExists(snapshotId)) {
result.addAll(
Set<Path> files =
getSnapshotFileInUse(
snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList));
snapshotId,
snapshotManager,
fileIO,
pathFactory,
manifestList,
manifestFile);
result.addAll(files);
} else if (snapshotManager.longLivedChangelogExists(snapshotId)) {
result.addAll(
Set<Path> files =
getChangelogFileInUse(
snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList));
snapshotId,
snapshotManager,
fileIO,
pathFactory,
manifestList,
manifestFile);
result.addAll(files);
} else {
throw new RuntimeException(
String.format("The snapshot %s does not exist.", snapshotId));
Expand All @@ -595,10 +608,10 @@ public static Set<Path> getFilesInUse(
private static Set<Path> getSnapshotFileInUse(
long snapshotId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath());
CoreOptions options = new CoreOptions(schemaManager.latest().get().options());
Expand All @@ -625,7 +638,11 @@ private static Set<Path> getSnapshotFileInUse(
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

// data file
List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
List<ManifestEntry> entries =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
entries = new ArrayList<>(FileEntry.mergeEntries(entries));
for (ManifestEntry entry : entries) {
result.add(
new Path(
Expand All @@ -641,7 +658,9 @@ private static Set<Path> getSnapshotFileInUse(
// use list.
if (changelogDecoupled && !produceChangelog) {
entries =
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
manifestList.readDeltaManifests(snapshot).stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : entries) {
// append delete file are delayed to delete
if (entry.kind() == FileKind.DELETE
Expand All @@ -661,64 +680,57 @@ private static Set<Path> getSnapshotFileInUse(
private static Set<Path> getChangelogFileInUse(
long changelogId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath());
CoreOptions options = new CoreOptions(schemaManager.latest().get().options());
boolean produceChangelog =
options.changelogProducer() != CoreOptions.ChangelogProducer.NONE;

Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId);
Changelog changelog = Changelog.fromPath(fileIO, changelogPath);

// changelog file
result.add(changelogPath);

// manifest lists
if (!produceChangelog) {
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
}
if (changelog.changelogManifestList() != null) {
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
}

// manifests
List<ManifestFileMeta> manifests =
new ArrayList<>(manifestList.readChangelogManifests(changelog));
if (!produceChangelog) {
manifests.addAll(manifestList.readDataManifests(changelog));
}

manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

// data file
// not all manifests contains useful data file
// (1) produceChangelog = 'true': data file in changelog manifests
// (2) produceChangelog = 'false': 'APPEND' data file in delta manifests

// delta file
if (!produceChangelog) {
for (ManifestEntry entry :
scan.withManifestList(manifestList.readDeltaManifests(changelog))
.plan()
.files()) {
if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE) {
// TODO why we need to keep base manifests?
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
manifestList
.readDataManifests(changelog)
.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
List<ManifestFileMeta> manifests = manifestList.readDeltaManifests(changelog);
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
List<ManifestEntry> files =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
}
}
} else {
// changelog
for (ManifestEntry entry :
scan.withManifestList(manifestList.readChangelogManifests(changelog))
.plan()
.files()) {
} else if (changelog.changelogManifestList() != null) {
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
List<ManifestFileMeta> manifests = manifestList.readChangelogManifests(changelog);
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
List<ManifestEntry> files =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -451,7 +452,7 @@ public void testExpireWithUpgradedFile() throws Exception {
store.assertCleaned();
}

@Test
@RepeatedTest(5)
public void testChangelogOutLivedSnapshot() throws Exception {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -252,29 +250,6 @@ public void testWithSnapshot() throws Exception {
runTestExactMatch(scan, wantedSnapshot, expected);
}

@Test
public void testWithManifestList() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numCommits = random.nextInt(10) + 1;
for (int i = 0; i < numCommits; i++) {
List<KeyValue> data = generateData(random.nextInt(100) + 1);
writeData(data);
}

ManifestList manifestList = store.manifestListFactory().create();
long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1;
Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
List<ManifestFileMeta> wantedManifests = manifestList.readDataManifests(wantedSnapshot);

FileStoreScan scan = store.newScan();
scan.withManifestList(wantedManifests);

List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
gen.sort(expectedKvs);
Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
runTestExactMatch(scan, null, expected);
}

@Test
public void testDropStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,10 @@ public void testAsyncExpireExecutionMode() throws Exception {
TestFileStore.getFilesInUse(
latestSnapshotId,
snapshotManager,
store.newScan(),
table.fileIO(),
store.pathFactory(),
store.manifestListFactory().create());
store.manifestListFactory().create(),
store.manifestFileFactory().create());

List<Path> unusedFileList =
Files.walk(Paths.get(tempDir.toString()))
Expand Down

0 comments on commit da4d64b

Please sign in to comment.