Skip to content

Commit

Permalink
[core] Introduce Table.snapshot(long snapshotId) (apache#3982)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 16, 2024
1 parent fa1f373 commit 154126a
Show file tree
Hide file tree
Showing 23 changed files with 173 additions and 136 deletions.
107 changes: 5 additions & 102 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@

package org.apache.paimon;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -35,11 +31,7 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -63,7 +55,10 @@
* commitIdentifier (which is a long value). Json can automatically perform type conversion so
* there is no compatibility issue.
* </ul>
*
* @since 0.9.0
*/
@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class Snapshot {

Expand Down Expand Up @@ -351,85 +346,6 @@ public String statistics() {
return statistics;
}

/**
* Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this
* snapshot.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> allManifests(ManifestList manifestList) {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(dataManifests(manifestList));
result.addAll(changelogManifests(manifestList));
return result;
}

/**
* Return a {@link ManifestFileMeta} for each data manifest in this snapshot.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> dataManifests(ManifestList manifestList) {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(manifestList.read(baseManifestList));
result.addAll(deltaManifests(manifestList));
return result;
}

/**
* Return a {@link ManifestFileMeta} for each delta manifest in this snapshot.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> deltaManifests(ManifestList manifestList) {
return manifestList.read(deltaManifestList);
}

/**
* Return a {@link ManifestFileMeta} for each changelog manifest in this snapshot.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> changelogManifests(ManifestList manifestList) {
return changelogManifestList == null
? Collections.emptyList()
: manifestList.read(changelogManifestList);
}

/**
* Return record count of all changes occurred in this snapshot given the scan.
*
* @param scan a {@link FileStoreScan} instance used for count of reading files at snapshot.
* @return total record count of Snapshot.
*/
public Long totalRecordCount(FileStoreScan scan) {
return totalRecordCount == null
? recordCount(scan.withSnapshot(id).plan().files())
: totalRecordCount;
}

public static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}

public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand All @@ -440,25 +356,12 @@ public static Snapshot fromJson(String json) {

public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
return fromPathThrowsException(fileIO, path);
return Snapshot.fromJson(fileIO.readFileUtf8(path));
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " + path, e);
}
}

@Nullable
public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
return fromPathThrowsException(fileIO, path);
} catch (FileNotFoundException e) {
return null;
}
}

private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) throws IOException {
return Snapshot.fromJson(fileIO.readFileUtf8(path));
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,22 @@ public static Filter<InternalRow> createEntryRowFilter(
return true;
};
}

public static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}

public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.manifest;

import org.apache.paimon.Snapshot;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
Expand All @@ -32,6 +33,8 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -60,6 +63,51 @@ private ManifestList(
cache);
}

/**
* Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this
* snapshot.
*
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> readAllManifests(Snapshot snapshot) {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(readDataManifests(snapshot));
result.addAll(readChangelogManifests(snapshot));
return result;
}

/**
* Return a {@link ManifestFileMeta} for each data manifest in this snapshot.
*
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(read(snapshot.baseManifestList()));
result.addAll(readDeltaManifests(snapshot));
return result;
}

/**
* Return a {@link ManifestFileMeta} for each delta manifest in this snapshot.
*
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
return read(snapshot.deltaManifestList());
}

/**
* Return a {@link ManifestFileMeta} for each changelog manifest in this snapshot.
*
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> readChangelogManifests(Snapshot snapshot) {
return snapshot.changelogManifestList() == null
? Collections.emptyList()
: read(snapshot.changelogManifestList());
}

/**
* Write several {@link ManifestFileMeta}s into a manifest list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,18 +403,18 @@ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
switch (scanMode) {
case ALL:
return snapshot.dataManifests(manifestList);
return manifestList.readDataManifests(snapshot);
case DELTA:
return snapshot.deltaManifests(manifestList);
return manifestList.readDeltaManifests(snapshot);
case CHANGELOG:
if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
return snapshot.changelogManifests(manifestList);
return manifestList.readChangelogManifests(snapshot);
}

// compatible with Paimon 0.2, we'll read extraFiles in DataFileMeta
// see comments on DataFileMeta#extraFiles
if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
return snapshot.deltaManifests(manifestList);
return manifestList.readDeltaManifests(snapshot);
}
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public Predicate<ManifestEntry> createDataFileSkipperForTags(
* It is possible that a job was killed during expiration and some manifest files have been
* deleted, so if the clean methods need to get manifests of a snapshot to be cleaned, we should
* try to read manifests and return empty list if failed instead of calling {@link
* Snapshot#dataManifests} directly.
* ManifestList#readDataManifests} directly.
*/
protected List<ManifestFileMeta> tryReadManifestList(String manifestListName) {
try {
Expand Down Expand Up @@ -424,7 +424,7 @@ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
// data manifests
skippingSet.add(skippingSnapshot.baseManifestList());
skippingSet.add(skippingSnapshot.deltaManifestList());
skippingSnapshot.dataManifests(manifestList).stream()
manifestList.readDataManifests(skippingSnapshot).stream()
.map(ManifestFileMeta::fileName)
.forEach(skippingSet::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
Expand Down Expand Up @@ -839,9 +842,9 @@ boolean tryCommitOnce(
Long currentWatermark = watermark;
String previousIndexManifest = null;
if (latestSnapshot != null) {
previousTotalRecordCount = latestSnapshot.totalRecordCount(scan);
previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
List<ManifestFileMeta> previousManifests =
latestSnapshot.dataManifests(manifestList);
manifestList.readDataManifests(latestSnapshot);
// read all previous manifest files
oldMetas.addAll(previousManifests);
// read the last snapshot to complete the bucket's offsets when logOffsets does not
Expand Down Expand Up @@ -872,8 +875,7 @@ boolean tryCommitOnce(
previousChangesListName = manifestList.write(newMetas);

// the added records subtract the deleted records from
long deltaRecordCount =
Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles);
long deltaRecordCount = recordCountAdd(tableFiles) - recordCountDelete(tableFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

// write new changes into manifest files
Expand Down Expand Up @@ -928,7 +930,7 @@ boolean tryCommitOnce(
logOffsets,
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
recordCount(changelogFiles),
currentWatermark,
statsFileName);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.manifest.ManifestEntry.recordCount;

/** Scan operation which produces a plan. */
public interface FileStoreScan {

Expand Down Expand Up @@ -77,6 +79,17 @@ public interface FileStoreScan {
/** Produce a {@link Plan}. */
Plan plan();

/**
* Return record count of all changes occurred in this snapshot given the scan.
*
* @return total record count of Snapshot.
*/
default Long totalRecordCount(Snapshot snapshot) {
return snapshot.totalRecordCount() == null
? (Long) recordCount(withSnapshot(snapshot.id()).plan().files())
: snapshot.totalRecordCount();
}

/**
* Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some critical information, so it
* cannot perform filtering based on statistical information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public OptionalLong latestSnapshotId() {
return snapshot == null ? OptionalLong.empty() : OptionalLong.of(snapshot);
}

@Override
public Snapshot snapshot(long snapshotId) {
return store().snapshotManager().snapshot(snapshotId);
}

@Override
public String name() {
return identifier().getObjectName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
Expand Down Expand Up @@ -128,6 +129,11 @@ public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public Snapshot snapshot(long snapshotId) {
return wrapped.snapshot(snapshotId);
}

@Override
public void rollbackTo(long snapshotId) {
wrapped.rollbackTo(snapshotId);
Expand Down
Loading

0 comments on commit 154126a

Please sign in to comment.