Skip to content

Commit

Permalink
expire snapshot; file deletion.
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jul 15, 2024
1 parent 01773e7 commit d651ef4
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,26 @@ public static Object convertJavaObject(DataType literalType, Object o) {
int scale = decimalType.getScale();
return Decimal.fromBigDecimal((BigDecimal) o, precision, scale);
case TIMESTAMP_WITHOUT_TIME_ZONE:
Timestamp ts;
// spark PushedFilters 结果是将 Timestamp 转为了时区无关的 Instant
// 这里如果是 Instant 类型则需要重新转换为时区感知的时间
// 参考 DateTimeUtils
Timestamp timestamp;
System.out.println(ZoneId.systemDefault());
if (o instanceof Instant) {
if (o instanceof java.sql.Timestamp) {
timestamp = Timestamp.fromSQLTimestamp((java.sql.Timestamp) o);
} else if (o instanceof Instant) {
Instant instant = (Instant) o;
LocalDateTime localDateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
ts = Timestamp.fromLocalDateTime(localDateTime);
LocalDateTime localDateTime =
instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
timestamp = Timestamp.fromLocalDateTime(localDateTime);
} else if (o instanceof LocalDateTime) {
timestamp = Timestamp.fromLocalDateTime((LocalDateTime) o);
} else {
throw new UnsupportedOperationException("Unsupported object: " + o);
}
return ts;
return timestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Timestamp timestamp;
// Timestamp timestamp;
if (o instanceof java.sql.Timestamp) {
timestamp = Timestamp.fromSQLTimestamp((java.sql.Timestamp) o);
} else if (o instanceof Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues)
return kvs;
}


public static void parseKeyValueString(Map<String, String> map, String kvString) {
String[] kv = kvString.split("=", 2);
if (kv.length != 2) {
Expand All @@ -62,11 +61,7 @@ public Map<String, String> parseCommaSeparateKeyValuesAskwang(String keyValues)
String[] kv = kvString.split("=", 2);
if (kv.length != 2) {
throw new IllegalArgumentException(
String.format(
"Invalid key-value string '%s'",
kvString
)
);
String.format("Invalid key-value string '%s'", kvString));
}
kvs.put(kv[0], kv[1]);
}
Expand Down
3 changes: 2 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ public List<ManifestFileMeta> allManifests(ManifestList manifestList) {
public List<ManifestFileMeta> dataManifests(ManifestList manifestList) {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(manifestList.read(baseManifestList));
result.addAll(deltaManifests(manifestList));
// result.addAll(deltaManifests(manifestList));
result.addAll(manifestList.read(deltaManifestList));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ static void mergeEntries(
}
}

// 读取 manifest file 中的内容,对文件进行 ADD 或 DELETE
static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifier, T> map) {
for (T entry : entries) {
Identifier identifier = entry.identifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Metadata of a manifest file. */
/** Metadata of a manifest file. 区分于 ManifestEntry,ManifestEntry是 manifest file 的内容. */
public class ManifestFileMeta {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFileMeta.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;

/**
* Universal Compaction Style is a compaction style, targeting the use cases requiring lower write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected void recordDeletionBuckets(ManifestEntry entry) {
.add(entry.bucket());
}

// 这里的 manifestList 是 snapshot.deltaManifestList()
public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> skipper) {
// try read manifests
List<String> manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList));
Expand Down Expand Up @@ -193,6 +194,7 @@ protected void doCleanUnusedDataFile(
dataFileToDelete.forEach(
(path, pair) -> {
ManifestEntry entry = pair.getLeft();
// skipper.test(entry)=true, 表示 entry 被忽略,不被删除
// check whether we should skip the data file
if (!skipper.test(entry)) {
// delete data files
Expand Down Expand Up @@ -295,10 +297,12 @@ public void cleanUnusedManifestList(String manifestName, Set<String> skippingSet
String fileName = manifest.fileName();
if (!skippingSet.contains(fileName)) {
toDeleteManifests.add(fileName);
// 已经删除的 manifest-file 添加到 skippingSet,避免多次删除
// to avoid other snapshots trying to delete again
skippingSet.add(fileName);
}
}
// 删除 manifest-list 文件
if (!skippingSet.contains(manifestName)) {
toDeleteManifests.add(manifestName);
}
Expand Down Expand Up @@ -327,7 +331,7 @@ public Predicate<ManifestEntry> dataFileSkipper(
addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index));
}

return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry);
return entry -> (index >= 0 && containsDataFile(cachedTagDataFiles, entry));
}

/**
Expand Down Expand Up @@ -364,6 +368,7 @@ protected List<String> readManifestFileNames(List<ManifestFileMeta> manifestFile
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot snapshot)
throws IOException {
// 从 snapshot 对应的 manifest 读取 DataFile 文件
for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
Expand All @@ -373,6 +378,7 @@ protected void addMergedDataFiles(
}

protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) throws IOException {
// 读取 baseManifestList 和 deltaManifestList 中包含的 manifestfile
// read data manifests
List<String> files = tryReadDataManifests(snapshot);

Expand All @@ -384,6 +390,7 @@ protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) throw
FileEntry.mergeEntries(entries, map);
}

// 只取 value 对象 ManifestEntry
return map.values();
}

Expand All @@ -408,6 +415,7 @@ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
Set<String> skippingSet = new HashSet<>();

for (Snapshot skippingSnapshot : skippingSnapshots) {
// manifest-list and manifest-file
// data manifests
skippingSet.add(skippingSnapshot.baseManifestList());
skippingSet.add(skippingSnapshot.deltaManifestList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public int expire() {
Preconditions.checkArgument(
retainMax >= retainMin, "retainMax must greater than retainMin.");

// example, snapshot[1,2,3,4,5,6,7,8,9,10]
// minRetain = 2, maxRetain = 5, maxDeletes=4
// min=max(10-5+1,1)=6
// maxExclusive=10-2+1=9
// maxExclusive = min(9, 1+4)=5

// the min snapshot to retain from 'snapshot.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
Expand All @@ -103,23 +109,29 @@ public int expire() {
// (the minimum number of completed snapshots to retain)
long maxExclusive = latestSnapshotId - retainMin + 1;

// Consumer which contains next snapshot.
// 最早的 consumer snapshot id 是最早需要读取的 snapshot,不可被删除
// the snapshot being read by the consumer cannot be deleted
maxExclusive =
Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));

// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
// askwang-todo: 考虑 maxDelete 后,min 就有可能大于 maxExclusive,则直接 expireUtil(1,5)
maxExclusive = Math.min(maxExclusive, earliest + maxDeletes);

// snapshot(i).timeMillis + snapshot.time-retained >= system.currentTime
// 则认定为该 snapshot 没有过期。比如 10点创建,保留1小时,当前是 10:30,表示没有过期
for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
// (the maximum time of snapshots to retain)
if (snapshotManager.snapshotExists(id)
&& olderThanMills <= snapshotManager.snapshot(id).timeMillis()) {
// 如果 id 没有过期,则过期清理的 snapshot id 范围为 [earliest, id)
return expireUntil(earliest, id);
}
}

// [min, maxExclusive) 之间的 snapshot 都已过期
return expireUntil(earliest, maxExclusive);
}

Expand Down Expand Up @@ -155,8 +167,11 @@ public int expireUntil(long earliestId, long endExclusiveId) {
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

// tag 依赖的 snapshot
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();

// 删除文件:删除一个 snapshot 下的文件,前提条件是下一个 snapshot 没有使用该文件
// askwang-todo: 为什么要这么遍历?
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
Expand All @@ -168,6 +183,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// expire merge tree files and collect changed buckets
Predicate<ManifestEntry> skipper;
try {
// 没有 tag 的情况,skipper 始终为 false
skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
Expand Down Expand Up @@ -196,29 +212,35 @@ public int expireUntil(long earliestId, long endExclusiveId) {

// data files and changelog files in bucket directories has been deleted
// then delete changed bucket directories if they are empty
// 从 bucket=-1 -> bucket=2,清理snapshot/file 时会清理之前动态 bucket 生成的文件和空的 bucket 目录
if (cleanEmptyDirectories) {
snapshotDeletion.cleanDataDirectories();
}

// 过滤出不应该被删除的 snapshot manifest 和文件,因为 tag snapshot 会依赖
// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
TagManager.findOverlappedSnapshots(
taggedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);

for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
}

Snapshot snapshot = snapshotManager.snapshot(id);
// clean manifest and indexFile
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
if (expireConfig.isChangelogDecoupled()) {
commitChangelog(new Changelog(snapshot));
}
// snapshot id 删除的范围是 [bengin, end)
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
}

// 更新 snapshot/EARLIEST 文件
writeEarliestHint(endExclusiveId);
return (int) (endExclusiveId - beginInclusiveId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());

// Predicate<String> filter = s -> true. 只要是字符串就返回 true
// Predicate<String> filter = s -> true. 只要是字符串就返回 true,if条件不满足
if (!filter.test(tagName)) {
continue;
}
Expand Down Expand Up @@ -412,6 +412,7 @@ public static List<Snapshot> findOverlappedSnapshots(
List<Snapshot> snapshots = new ArrayList<>();
int right = findPreviousTag(taggedSnapshots, endExclusive);
if (right >= 0) {
// askwang-todo: left 会多一个无效的值,参考 findNextOrEqualTagAskwang
int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0);
for (int i = left; i <= right; i++) {
snapshots.add(taggedSnapshots.get(i));
Expand All @@ -420,6 +421,25 @@ public static List<Snapshot> findOverlappedSnapshots(
return snapshots;
}

/**
* 找到 taggedSnapshots 和 snapshot [begin, end) id 直接重叠的 snapshot 缩小 taggedSnapshots 中的 snapshot.
* id 的范围 [left, right); 第一次小于或等于 begin 的 id 为 left,(优化:第一次大于或等于 begin 的 id 为 left) 第一次小于 end 的.
* id 为 right.
*/
private static List<Snapshot> findOverlappedSnapshotsAskwang(
List<Snapshot> taggedSnapshot, long beginInclusive, long endExclusive) {
List<Snapshot> snapshots = new ArrayList<>();
int right = findPreviousTag(taggedSnapshot, endExclusive);
if (right >= 0) {
int left = Math.max(findNextOrEqualTagAskwang(taggedSnapshot, beginInclusive), 0);
for (int i = left; i <= right; i++) {
snapshots.add(taggedSnapshot.get(i));
}
}

return snapshots;
}

public static int findPreviousTag(List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
if (taggedSnapshots.get(i).id() < targetSnapshotId) {
Expand All @@ -429,6 +449,17 @@ public static int findPreviousTag(List<Snapshot> taggedSnapshots, long targetSna
return -1;
}

/** 减少不重叠的 snapshot 范围 [begin, end): [10, 15) taggedSnapshots: [7, 9, 11, 12]. */
private static int findNextOrEqualTagAskwang(
List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = 0; i < targetSnapshotId; i++) {
if (taggedSnapshots.get(i).id() >= targetSnapshotId) {
return i;
}
}
return -1;
}

private static int findPreviousOrEqualTag(
List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -75,7 +74,7 @@ public void testBucketHashCode() {
int recordNum = 10;
for (int i = 1; i <= recordNum; i++) {
iList.add(i);
list.add(bucket(extractor("", "a"), GenericRow.of(i,2,3)));
list.add(bucket(extractor("", "a"), GenericRow.of(i, 2, 3)));
}

System.out.println("origin: " + list.size());
Expand Down Expand Up @@ -109,8 +108,6 @@ private FixedBucketRowKeyExtractor extractor(String bk, String pk) {
return extractor("", bk, pk);
}



private FixedBucketRowKeyExtractor extractor(
String partK, String bk, String pk, int numBucket) {
RowType rowType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ abstract class PaimonBaseScanBuilder(table: Table)
* should be returned from the data source if and only if all of the filters match. That is,
* filters must be interpreted as ANDed together.
*
* <p>spark-sql INFO V2ScanRelationPushDown:
* Pushing operators to z_paimon_pk_table
* Pushed Filters: IsNotNull(dt), EqualTo(dt,2024-04-11T03:01:00Z)
* Post-Scan Filters: isnotnull(dt#3),(dt#3 = 2024-04-11 11:01:00)
* spark 侧输出的 Post-scan Filter 没问题是 spark 进行了 filter 的 translateFilterWithMapping,
* 之后又进行了 rebuildExpressionFromFilter 还原
* <p>spark-sql INFO V2ScanRelationPushDown: Pushing operators to z_paimon_pk_table Pushed
* Filters: IsNotNull(dt), EqualTo(dt,2024-04-11T03:01:00Z) Post-Scan Filters:
* isnotnull(dt#3),(dt#3 = 2024-04-11 11:01:00) spark 侧输出的 Post-scan Filter 没问题是 spark 进行了 filter
* 的 translateFilterWithMapping, 之后又进行了 rebuildExpressionFromFilter 还原
*/
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
// spark 传进来的 filter: Array[Filter] 就是转换为 utc 类型的. EqualTo(dt,2024-04-11T03:01:00Z)
Expand Down
Loading

0 comments on commit d651ef4

Please sign in to comment.