Skip to content

Commit

Permalink
[core] Optimize memory usage for expiring snapshots and tags (#4655)
Browse files Browse the repository at this point in the history
This closes #4655.
  • Loading branch information
JingsongLi authored Dec 9, 2024
1 parent e18f6ed commit 9191e2e
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

import org.apache.paimon.data.BinaryRow;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** A {@link SimpleFileEntry} with {@link #fileSource}. */
public class ExpireFileEntry extends SimpleFileEntry {

@Nullable private final FileSource fileSource;

public ExpireFileEntry(
FileKind kind,
BinaryRow partition,
int bucket,
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
@Nullable FileSource fileSource) {
super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey);
this.fileSource = fileSource;
}

public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}

public static ExpireFileEntry from(ManifestEntry entry) {
return new ExpireFileEntry(
entry.kind(),
entry.partition(),
entry.bucket(),
entry.level(),
entry.fileName(),
entry.file().extraFiles(),
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
entry.file().fileSource().orElse(null));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ExpireFileEntry that = (ExpireFileEntry) o;
return fileSource == that.fileSource;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), fileSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public interface FileEntry {

BinaryRow maxKey();

List<String> extraFiles();

/**
* The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
* file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public BinaryRow maxKey() {
return file.maxKey();
}

@Override
public List<String> extraFiles() {
return file.extraFiles();
}

public int totalBuckets() {
return totalBuckets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -84,6 +85,15 @@ public long suggestedFileSize() {
return suggestedFileSize;
}

public List<ExpireFileEntry> readExpireFileEntries(String fileName, @Nullable Long fileSize) {
List<ManifestEntry> entries = read(fileName, fileSize);
List<ExpireFileEntry> result = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
result.add(ExpireFileEntry.from(entry));
}
return result;
}

/**
* Write several {@link ManifestEntry}s into manifest files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public BinaryRow maxKey() {
return maxKey;
}

@Override
public List<String> extraFiles() {
return extraFiles;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand Down Expand Up @@ -60,7 +60,7 @@ public ChangelogDeletion(
}

@Override
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ManifestEntry> skipper) {
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ExpireFileEntry> skipper) {
if (changelog.changelogManifestList() != null) {
deleteAddedDataFiles(changelog.changelogManifestList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand All @@ -46,15 +47,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Base class for file deletion including methods for clean data files, manifest files and empty
Expand Down Expand Up @@ -110,7 +109,7 @@ public FileDeletionBase(
* @param skipper if the test result of a data file is true, it will be skipped when deleting;
* else it will be deleted
*/
public abstract void cleanUnusedDataFiles(T snapshot, Predicate<ManifestEntry> skipper);
public abstract void cleanUnusedDataFiles(T snapshot, Predicate<ExpireFileEntry> skipper);

/**
* Clean metadata files that will not be used anymore of a snapshot, including data manifests,
Expand Down Expand Up @@ -164,21 +163,23 @@ public void cleanEmptyDirectories() {
deletionBuckets.clear();
}

protected void recordDeletionBuckets(ManifestEntry entry) {
protected void recordDeletionBuckets(ExpireFileEntry entry) {
deletionBuckets
.computeIfAbsent(entry.partition(), p -> new HashSet<>())
.add(entry.bucket());
}

public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> skipper) {
public void cleanUnusedDataFiles(String manifestList, Predicate<ExpireFileEntry> skipper) {
// try read manifests
List<String> manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList));
List<ManifestEntry> manifestEntries;
List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
List<ExpireFileEntry> manifestEntries;
// data file path -> (original manifest entry, extra file paths)
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new HashMap<>();
for (String manifest : manifestFileNames) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new HashMap<>();
for (ManifestFileMeta manifest : manifests) {
try {
manifestEntries = manifestFile.read(manifest);
manifestEntries =
manifestFile.readExpireFileEntries(
manifest.fileName(), manifest.fileSize());
} catch (Exception e) {
// cancel deletion if any exception occurs
LOG.warn("Failed to read some manifest files. Cancel deletion.", e);
Expand All @@ -192,12 +193,12 @@ public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> s
}

protected void doCleanUnusedDataFile(
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
Predicate<ManifestEntry> skipper) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
Predicate<ExpireFileEntry> skipper) {
List<Path> actualDataFileToDelete = new ArrayList<>();
dataFileToDelete.forEach(
(path, pair) -> {
ManifestEntry entry = pair.getLeft();
ExpireFileEntry entry = pair.getLeft();
// check whether we should skip the data file
if (!skipper.test(entry)) {
// delete data files
Expand All @@ -211,20 +212,20 @@ protected void doCleanUnusedDataFile(
}

protected void getDataFileToDelete(
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
List<ManifestEntry> dataFileEntries) {
Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
List<ExpireFileEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry, because that
// file might be upgraded
for (ManifestEntry entry : dataFileEntries) {
for (ExpireFileEntry entry : dataFileEntries) {
Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.file().fileName());
Path dataFilePath = new Path(bucketPath, entry.fileName());
switch (entry.kind()) {
case ADD:
dataFileToDelete.remove(dataFilePath);
break;
case DELETE:
List<Path> extraFiles = new ArrayList<>(entry.file().extraFiles().size());
for (String file : entry.file().extraFiles()) {
List<Path> extraFiles = new ArrayList<>(entry.extraFiles().size());
for (String file : entry.extraFiles()) {
extraFiles.add(new Path(bucketPath, file));
}
dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles));
Expand All @@ -242,27 +243,28 @@ protected void getDataFileToDelete(
* @param manifestListName name of manifest list
*/
public void deleteAddedDataFiles(String manifestListName) {
List<String> manifestFileNames =
readManifestFileNames(tryReadManifestList(manifestListName));
for (String file : manifestFileNames) {
List<ManifestFileMeta> manifests = tryReadManifestList(manifestListName);
for (ManifestFileMeta manifest : manifests) {
try {
List<ManifestEntry> manifestEntries = manifestFile.read(file);
List<ExpireFileEntry> manifestEntries =
manifestFile.readExpireFileEntries(
manifest.fileName(), manifest.fileSize());
deleteAddedDataFiles(manifestEntries);
} catch (Exception e) {
// We want to delete the data file, so just ignore the unavailable files
LOG.info("Failed to read manifest " + file + ". Ignore it.", e);
LOG.info("Failed to read manifest " + manifest.fileName() + ". Ignore it.", e);
}
}
}

private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
for (ManifestEntry entry : manifestEntries) {
for (ExpireFileEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
dataFileToDelete.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
entry.fileName()));
recordDeletionBuckets(entry);
}
}
Expand Down Expand Up @@ -327,7 +329,7 @@ protected void cleanUnusedManifests(
cleanUnusedStatisticsManifests(snapshot, skippingSet);
}

public Predicate<ManifestEntry> createDataFileSkipperForTags(
public Predicate<ExpireFileEntry> createDataFileSkipperForTags(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws Exception {
int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId);
// refresh tag data files
Expand Down Expand Up @@ -358,55 +360,46 @@ protected List<ManifestFileMeta> tryReadManifestList(String manifestListName) {
}
}

protected List<String> tryReadDataManifests(Snapshot snapshot) {
List<ManifestFileMeta> manifestFileMetas = tryReadManifestList(snapshot.baseManifestList());
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
return readManifestFileNames(manifestFileMetas);
}

protected List<String> readManifestFileNames(List<ManifestFileMeta> manifestFileMetas) {
return manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toCollection(LinkedList::new));
}

/**
* NOTE: This method is used for building data file skipping set. If failed to read some
* manifests, it will throw exception which callers must handle.
*/
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot snapshot)
throws IOException {
for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) {
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new HashSet<>())
.add(entry.file().fileName());
.add(entry.fileName());
}
}

protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) throws IOException {
protected Collection<ExpireFileEntry> readMergedDataFiles(Snapshot snapshot)
throws IOException {
// read data manifests
List<String> files = tryReadDataManifests(snapshot);

List<ManifestFileMeta> manifests = tryReadManifestList(snapshot.baseManifestList());
manifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));

// read and merge manifest entries
Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
for (String manifest : files) {
List<ManifestEntry> entries;
entries = manifestFile.readWithIOException(manifest);
Map<Identifier, ExpireFileEntry> map = new HashMap<>();
for (ManifestFileMeta manifest : manifests) {
List<ExpireFileEntry> entries =
manifestFile.readExpireFileEntries(manifest.fileName(), manifest.fileSize());
FileEntry.mergeEntries(entries, map);
}

return map.values();
}

protected boolean containsDataFile(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry testee) {
Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ExpireFileEntry entry) {
Map<Integer, Set<String>> buckets = dataFiles.get(entry.partition());
if (buckets != null) {
Set<String> fileNames = buckets.get(testee.bucket());
Set<String> fileNames = buckets.get(entry.bucket());
if (fileNames != null) {
return fileNames.contains(testee.file().fileName());
return fileNames.contains(entry.fileName());
}
}
return false;
Expand Down
Loading

0 comments on commit 9191e2e

Please sign in to comment.