Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Remove FileStoreScan.withManifestList and fix unstable test #4552

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/** Default implementation of {@link FileStoreScan}. */
Expand All @@ -81,7 +80,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
Expand Down Expand Up @@ -161,25 +159,16 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {

@Override
public FileStoreScan withSnapshot(long snapshotId) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
return this;
}

@Override
public FileStoreScan withSnapshot(Snapshot snapshot) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshot;
return this;
}

@Override
public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests.");
this.specifiedManifests = manifests;
return this;
}

@Override
public FileStoreScan withKind(ScanMode scanMode) {
this.scanMode = scanMode;
Expand Down Expand Up @@ -401,10 +390,6 @@ private <T extends FileEntry> Iterator<T> readAndNoMergeFileEntries(
}

private ManifestsReader.Result readManifests() {
if (specifiedManifests != null) {
return new ManifestsReader.Result(null, specifiedManifests, specifiedManifests);
}

return manifestsReader.read(specifiedSnapshot, scanMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public interface FileStoreScan {

FileStoreScan withSnapshot(Snapshot snapshot);

FileStoreScan withManifestList(List<ManifestFileMeta> manifests);

FileStoreScan withKind(ScanMode scanMode);

FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
Expand Down
104 changes: 58 additions & 46 deletions paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.memory.HeapMemorySegmentPool;
Expand All @@ -38,7 +40,6 @@
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
Expand Down Expand Up @@ -561,29 +562,41 @@ public Set<Path> getFilesInUse(long snapshotId) {
return getFilesInUse(
snapshotId,
snapshotManager(),
newScan(),
fileIO,
pathFactory(),
manifestListFactory().create());
manifestListFactory().create(),
manifestFileFactory().create());
}

public static Set<Path> getFilesInUse(
long snapshotId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();

if (snapshotManager.snapshotExists(snapshotId)) {
result.addAll(
Set<Path> files =
getSnapshotFileInUse(
snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList));
snapshotId,
snapshotManager,
fileIO,
pathFactory,
manifestList,
manifestFile);
result.addAll(files);
} else if (snapshotManager.longLivedChangelogExists(snapshotId)) {
result.addAll(
Set<Path> files =
getChangelogFileInUse(
snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList));
snapshotId,
snapshotManager,
fileIO,
pathFactory,
manifestList,
manifestFile);
result.addAll(files);
} else {
throw new RuntimeException(
String.format("The snapshot %s does not exist.", snapshotId));
Expand All @@ -595,10 +608,10 @@ public static Set<Path> getFilesInUse(
private static Set<Path> getSnapshotFileInUse(
long snapshotId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath());
CoreOptions options = new CoreOptions(schemaManager.latest().get().options());
Expand All @@ -625,7 +638,11 @@ private static Set<Path> getSnapshotFileInUse(
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

// data file
List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
List<ManifestEntry> entries =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
entries = new ArrayList<>(FileEntry.mergeEntries(entries));
for (ManifestEntry entry : entries) {
result.add(
new Path(
Expand All @@ -641,7 +658,9 @@ private static Set<Path> getSnapshotFileInUse(
// use list.
if (changelogDecoupled && !produceChangelog) {
entries =
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
manifestList.readDeltaManifests(snapshot).stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : entries) {
// append delete file are delayed to delete
if (entry.kind() == FileKind.DELETE
Expand All @@ -661,64 +680,57 @@ private static Set<Path> getSnapshotFileInUse(
private static Set<Path> getChangelogFileInUse(
long changelogId,
SnapshotManager snapshotManager,
FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestList manifestList) {
ManifestList manifestList,
ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath());
CoreOptions options = new CoreOptions(schemaManager.latest().get().options());
boolean produceChangelog =
options.changelogProducer() != CoreOptions.ChangelogProducer.NONE;

Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId);
Changelog changelog = Changelog.fromPath(fileIO, changelogPath);

// changelog file
result.add(changelogPath);

// manifest lists
if (!produceChangelog) {
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
}
if (changelog.changelogManifestList() != null) {
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
}

// manifests
List<ManifestFileMeta> manifests =
new ArrayList<>(manifestList.readChangelogManifests(changelog));
if (!produceChangelog) {
manifests.addAll(manifestList.readDataManifests(changelog));
}

manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

// data file
// not all manifests contains useful data file
// (1) produceChangelog = 'true': data file in changelog manifests
// (2) produceChangelog = 'false': 'APPEND' data file in delta manifests

// delta file
if (!produceChangelog) {
for (ManifestEntry entry :
scan.withManifestList(manifestList.readDeltaManifests(changelog))
.plan()
.files()) {
if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE) {
// TODO why we need to keep base manifests?
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
manifestList
.readDataManifests(changelog)
.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));

result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
List<ManifestFileMeta> manifests = manifestList.readDeltaManifests(changelog);
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
List<ManifestEntry> files =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
}
}
} else {
// changelog
for (ManifestEntry entry :
scan.withManifestList(manifestList.readChangelogManifests(changelog))
.plan()
.files()) {
} else if (changelog.changelogManifestList() != null) {
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
List<ManifestFileMeta> manifests = manifestList.readChangelogManifests(changelog);
manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
List<ManifestEntry> files =
manifests.stream()
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -451,7 +452,7 @@ public void testExpireWithUpgradedFile() throws Exception {
store.assertCleaned();
}

@Test
@RepeatedTest(5)
public void testChangelogOutLivedSnapshot() throws Exception {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -252,29 +250,6 @@ public void testWithSnapshot() throws Exception {
runTestExactMatch(scan, wantedSnapshot, expected);
}

@Test
public void testWithManifestList() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numCommits = random.nextInt(10) + 1;
for (int i = 0; i < numCommits; i++) {
List<KeyValue> data = generateData(random.nextInt(100) + 1);
writeData(data);
}

ManifestList manifestList = store.manifestListFactory().create();
long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1;
Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
List<ManifestFileMeta> wantedManifests = manifestList.readDataManifests(wantedSnapshot);

FileStoreScan scan = store.newScan();
scan.withManifestList(wantedManifests);

List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
gen.sort(expectedKvs);
Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
runTestExactMatch(scan, null, expected);
}

@Test
public void testDropStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,10 @@ public void testAsyncExpireExecutionMode() throws Exception {
TestFileStore.getFilesInUse(
latestSnapshotId,
snapshotManager,
store.newScan(),
table.fileIO(),
store.pathFactory(),
store.manifestListFactory().create());
store.manifestListFactory().create(),
store.manifestFileFactory().create());

List<Path> unusedFileList =
Files.walk(Paths.get(tempDir.toString()))
Expand Down
Loading