From 46609f27f99303adeba14f3889a38e34a45780f2 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 1 Apr 2024 15:22:04 +0800 Subject: [PATCH] [core] Introduce ExpireChangelogImpl to decouple the changelog lifecycle (#3110) --- .../generated/core_configuration.html | 18 ++ .../java/org/apache/paimon/CoreOptions.java | 41 +++++ .../java/org/apache/paimon/Changelog.java | 131 +++++++++++++ .../main/java/org/apache/paimon/Snapshot.java | 38 ++-- .../paimon/operation/FileDeletionBase.java | 68 ++++--- .../paimon/operation/SnapshotDeletion.java | 7 +- .../paimon/schema/SchemaValidation.java | 11 ++ .../paimon/table/AbstractFileStoreTable.java | 28 ++- .../paimon/table/ExpireChangelogImpl.java | 172 ++++++++++++++++++ .../paimon/table/ExpireSnapshotsImpl.java | 83 +++++++-- .../apache/paimon/table/ReadonlyTable.java | 8 + .../apache/paimon/table/RollbackHelper.java | 43 +++++ .../java/org/apache/paimon/table/Table.java | 3 + .../table/source/AbstractInnerTableScan.java | 16 +- .../source/InnerStreamTableScanImpl.java | 24 +-- ...ContinuousFromSnapshotStartingScanner.java | 31 +++- ...ontinuousFromTimestampStartingScanner.java | 13 +- .../paimon/utils/NextSnapshotFetcher.java | 74 ++++++++ .../apache/paimon/utils/SnapshotManager.java | 140 ++++++++++---- .../java/org/apache/paimon/TestFileStore.java | 118 +++++++++++- .../paimon/operation/ExpireSnapshotsTest.java | 35 ++++ .../paimon/operation/FileDeletionTest.java | 1 + .../paimon/table/FileStoreTableTestBase.java | 6 + .../paimon/table/source/StartupModeTest.java | 4 +- .../paimon/table/source/TableScanTest.java | 2 +- ...nuousFromTimestampStartingScannerTest.java | 81 ++++++++- .../source/snapshot/ScannerTestBase.java | 12 +- .../paimon/utils/SnapshotManagerTest.java | 49 ++++- .../flink/ContinuousFileStoreITCase.java | 42 +++++ 29 files changed, 1152 insertions(+), 147 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/Changelog.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f2e4a6a044e6..3f4c76bf03aa 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -62,6 +62,24 @@ Boolean Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction. + +
changelog.num-retained.max
+ infinite + Integer + The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number. + + +
changelog.num-retained.min
+ 10 + Integer + The minimum number of completed changelog to retain. Should be greater than or equal to 1. + + +
changelog.time-retained
+ 1 h + Duration + The maximum time of completed changelog to retain. +
commit.callback.#.param
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index ead337561b3e..895da7fe8c68 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -191,6 +191,27 @@ public class CoreOptions implements Serializable { .defaultValue(Duration.ofHours(1)) .withDescription("The maximum time of completed snapshots to retain."); + public static final ConfigOption CHANGELOG_NUM_RETAINED_MIN = + key("changelog.num-retained.min") + .intType() + .defaultValue(10) + .withDescription( + "The minimum number of completed changelog to retain. Should be greater than or equal to 1."); + + @Documentation.OverrideDefault("infinite") + public static final ConfigOption CHANGELOG_NUM_RETAINED_MAX = + key("changelog.num-retained.max") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number."); + + public static final ConfigOption CHANGELOG_TIME_RETAINED = + key("changelog.time-retained") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription("The maximum time of completed changelog to retain."); + public static final ConfigOption SNAPSHOT_EXPIRE_EXECUTION_MODE = key("snapshot.expire.execution-mode") .enumType(ExpireExecutionMode.class) @@ -1214,6 +1235,26 @@ public Duration snapshotTimeRetain() { return options.get(SNAPSHOT_TIME_RETAINED); } + public int changelogNumRetainMin() { + return options.get(CHANGELOG_NUM_RETAINED_MIN); + } + + public int changelogNumRetainMax() { + return options.get(CHANGELOG_NUM_RETAINED_MAX); + } + + public Duration changelogTimeRetain() { + return options.get(CHANGELOG_TIME_RETAINED); + } + + public boolean changelogLifecycleDecoupled() { + return changelogNumRetainMax() > snapshotNumRetainMax() + || options.get(CHANGELOG_TIME_RETAINED) + .compareTo(options.get(SNAPSHOT_TIME_RETAINED)) + > 0 + || changelogNumRetainMin() > snapshotNumRetainMin(); + } + public ExpireExecutionMode snapshotExpireExecutionMode() { return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java new file mode 100644 index 000000000000..1d87e299c9f0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Map; + +/** + * The metadata of changelog. It generates from the snapshot file during expiration. So that the + * changelog of the table can outlive the snapshot's lifecycle. A table's changelog can come from + * one source: + *
  • The changelog file. Eg: from the changelog-producer = 'input' + */ +public class Changelog extends Snapshot { + + private static final int CURRENT_VERSION = Snapshot.CURRENT_VERSION; + + public Changelog( + long id, + long schemaId, + String baseManifestList, + String deltaManifestList, + @Nullable String changelogManifestList, + @Nullable String indexManifest, + String commitUser, + long commitIdentifier, + CommitKind commitKind, + long timeMillis, + Map logOffsets, + @Nullable Long totalRecordCount, + @Nullable Long deltaRecordCount, + @Nullable Long changelogRecordCount, + @Nullable Long watermark, + @Nullable String statistics) { + this( + CURRENT_VERSION, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + @JsonCreator + public Changelog( + @JsonProperty(FIELD_VERSION) @Nullable Integer version, + @JsonProperty(FIELD_ID) long id, + @JsonProperty(FIELD_SCHEMA_ID) long schemaId, + @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, + @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, + @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList, + @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest, + @JsonProperty(FIELD_COMMIT_USER) String commitUser, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, + @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets, + @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount, + @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, + @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, + @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics) { + super( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + public static Changelog fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Changelog.class); + } + + public static Changelog fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return Changelog.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read changelog from path " + path, e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index c9bc2dba840e..33ea8f4bb7ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -69,25 +69,25 @@ public class Snapshot { public static final long FIRST_SNAPSHOT_ID = 1; public static final int TABLE_STORE_02_VERSION = 1; - private static final int CURRENT_VERSION = 3; - - private static final String FIELD_VERSION = "version"; - private static final String FIELD_ID = "id"; - private static final String FIELD_SCHEMA_ID = "schemaId"; - private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList"; - private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList"; - private static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList"; - private static final String FIELD_INDEX_MANIFEST = "indexManifest"; - private static final String FIELD_COMMIT_USER = "commitUser"; - private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier"; - private static final String FIELD_COMMIT_KIND = "commitKind"; - private static final String FIELD_TIME_MILLIS = "timeMillis"; - private static final String FIELD_LOG_OFFSETS = "logOffsets"; - private static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount"; - private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount"; - private static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount"; - private static final String FIELD_WATERMARK = "watermark"; - private static final String FIELD_STATISTICS = "statistics"; + protected static final int CURRENT_VERSION = 3; + + protected static final String FIELD_VERSION = "version"; + protected static final String FIELD_ID = "id"; + protected static final String FIELD_SCHEMA_ID = "schemaId"; + protected static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList"; + protected static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList"; + protected static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList"; + protected static final String FIELD_INDEX_MANIFEST = "indexManifest"; + protected static final String FIELD_COMMIT_USER = "commitUser"; + protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier"; + protected static final String FIELD_COMMIT_KIND = "commitKind"; + protected static final String FIELD_TIME_MILLIS = "timeMillis"; + protected static final String FIELD_LOG_OFFSETS = "logOffsets"; + protected static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount"; + protected static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount"; + protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount"; + protected static final String FIELD_WATERMARK = "watermark"; + protected static final String FIELD_STATISTICS = "statistics"; // version of snapshot // null for paimon <= 0.2 diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 79a64b049916..ded5bedd0ae5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -151,40 +151,14 @@ protected void recordDeletionBuckets(ManifestEntry entry) { .add(entry.bucket()); } - protected void cleanUnusedManifests( - Snapshot snapshot, Set skippingSet, boolean deleteChangelog) { - // clean base and delta manifests - List toDeleteManifests = new ArrayList<>(); - List toExpireManifests = new ArrayList<>(); - toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList())); - toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList())); - for (ManifestFileMeta manifest : toExpireManifests) { - String fileName = manifest.fileName(); - if (!skippingSet.contains(fileName)) { - toDeleteManifests.add(fileName); - // to avoid other snapshots trying to delete again - skippingSet.add(fileName); - } - } - deleteFiles(toDeleteManifests, manifestFile::delete); - - toDeleteManifests.clear(); - if (!skippingSet.contains(snapshot.baseManifestList())) { - toDeleteManifests.add(snapshot.baseManifestList()); - } - if (!skippingSet.contains(snapshot.deltaManifestList())) { - toDeleteManifests.add(snapshot.deltaManifestList()); - } - deleteFiles(toDeleteManifests, manifestList::delete); - - // clean changelog manifests - if (deleteChangelog && snapshot.changelogManifestList() != null) { - deleteFiles( - tryReadManifestList(snapshot.changelogManifestList()), - manifest -> manifestFile.delete(manifest.fileName())); - manifestList.delete(snapshot.changelogManifestList()); + public void cleanUnusedStatisticsManifests(Snapshot snapshot, Set skippingSet) { + // clean statistics + if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) { + statsFileHandler.deleteStats(snapshot.statistics()); } + } + public void cleanUnusedIndexManifests(Snapshot snapshot, Set skippingSet) { // clean index manifests String indexManifest = snapshot.indexManifest(); // check exists, it may have been deleted by other snapshots @@ -199,11 +173,35 @@ protected void cleanUnusedManifests( indexFileHandler.deleteManifest(indexManifest); } } + } - // clean statistics - if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) { - statsFileHandler.deleteStats(snapshot.statistics()); + public void cleanUnusedManifestList(String manifestName, Set skippingSet) { + List toDeleteManifests = new ArrayList<>(); + List toExpireManifests = tryReadManifestList(manifestName); + for (ManifestFileMeta manifest : toExpireManifests) { + String fileName = manifest.fileName(); + if (!skippingSet.contains(fileName)) { + toDeleteManifests.add(fileName); + // to avoid other snapshots trying to delete again + skippingSet.add(fileName); + } + } + if (!skippingSet.contains(manifestName)) { + toDeleteManifests.add(manifestName); + } + + deleteFiles(toDeleteManifests, manifestFile::delete); + } + + public void cleanUnusedManifests( + Snapshot snapshot, Set skippingSet, boolean deleteChangelog) { + cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); + cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); + if (deleteChangelog) { + cleanUnusedManifestList(snapshot.changelogManifestList(), skippingSet); } + cleanUnusedIndexManifests(snapshot, skippingSet); + cleanUnusedStatisticsManifests(snapshot, skippingSet); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index f3c353aadd40..a0c442fcd96d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -66,9 +66,12 @@ public SnapshotDeletion( @Override public void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper) { + cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); + } + + public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { // try read manifests - List manifestFileNames = - readManifestFileNames(tryReadManifestList(snapshot.deltaManifestList())); + List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); List manifestEntries; // data file path -> (original manifest entry, extra file paths) Map>> dataFileToDelete = new HashMap<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 18c95cd2f048..3697f18e123b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -46,6 +46,8 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX; +import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; @@ -121,6 +123,15 @@ public static void validateTableSchema(TableSchema schema) { + " should not be larger than " + SNAPSHOT_NUM_RETAINED_MAX.key()); + checkArgument( + options.changelogNumRetainMin() > 0, + CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1"); + checkArgument( + options.changelogNumRetainMin() <= options.changelogNumRetainMax(), + CHANGELOG_NUM_RETAINED_MIN.key() + + " should not be larger than " + + CHANGELOG_NUM_RETAINED_MAX.key()); + // Get the format type here which will try to convert string value to {@Code // FileFormatType}. If the string value is illegal, an exception will be thrown. CoreOptions.FileFormatType fileFormatType = options.formatType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index f7215dc574fa..5b97b82600f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -295,6 +295,16 @@ public ExpireSnapshots newExpireSnapshots() { snapshotManager(), store().newSnapshotDeletion(), store().newTagManager(), + coreOptions().snapshotExpireCleanEmptyDirectories(), + coreOptions().changelogLifecycleDecoupled()); + } + + @Override + public ExpireSnapshots newExpireChangelog() { + return new ExpireChangelogImpl( + snapshotManager(), + tagManager(), + store().newSnapshotDeletion(), coreOptions().snapshotExpireCleanEmptyDirectories()); } @@ -308,17 +318,27 @@ public TableCommitImpl newCommit(String commitUser, String branchName) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; if (!options.writeOnly()) { + boolean changelogDecoupled = options.changelogLifecycleDecoupled(); + ExpireSnapshots expireChangelog = + newExpireChangelog() + .maxDeletes(options.snapshotExpireLimit()) + .retainMin(options.changelogNumRetainMin()) + .retainMax(options.changelogNumRetainMax()); ExpireSnapshots expireSnapshots = newExpireSnapshots() .retainMax(options.snapshotNumRetainMax()) .retainMin(options.snapshotNumRetainMin()) .maxDeletes(options.snapshotExpireLimit()); long snapshotTimeRetain = options.snapshotTimeRetain().toMillis(); + long changelogTimeRetain = options.changelogTimeRetain().toMillis(); snapshotExpire = - () -> - expireSnapshots - .olderThanMills(System.currentTimeMillis() - snapshotTimeRetain) - .expire(); + () -> { + long current = System.currentTimeMillis(); + expireSnapshots.olderThanMills(current - snapshotTimeRetain).expire(); + if (changelogDecoupled) { + expireChangelog.olderThanMills(current - changelogTimeRetain).expire(); + } + }; } return new TableCommitImpl( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java new file mode 100644 index 000000000000..138b5f3ae55b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.Changelog; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashSet; + +/** Cleanup the changelog in changelog directory. */ +public class ExpireChangelogImpl implements ExpireSnapshots { + + public static final Logger LOG = LoggerFactory.getLogger(ExpireChangelogImpl.class); + + private final SnapshotManager snapshotManager; + private final ConsumerManager consumerManager; + private final SnapshotDeletion snapshotDeletion; + private final boolean cleanEmptyDirectories; + private final TagManager tagManager; + + private long olderThanMills = 0; + public int retainMin = 1; + private int retainMax = Integer.MAX_VALUE; + private int maxDeletes = Integer.MAX_VALUE; + + public ExpireChangelogImpl( + SnapshotManager snapshotManager, + TagManager tagManager, + SnapshotDeletion snapshotDeletion, + boolean cleanEmptyDirectories) { + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.consumerManager = + new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + this.snapshotDeletion = snapshotDeletion; + this.cleanEmptyDirectories = cleanEmptyDirectories; + } + + @Override + public ExpireChangelogImpl retainMax(int retainMax) { + this.retainMax = retainMax; + return this; + } + + @Override + public ExpireChangelogImpl retainMin(int retainMin) { + this.retainMin = retainMin; + return this; + } + + @Override + public ExpireChangelogImpl olderThanMills(long olderThanMills) { + this.olderThanMills = olderThanMills; + return this; + } + + @Override + public ExpireChangelogImpl maxDeletes(int maxDeletes) { + this.maxDeletes = maxDeletes; + return this; + } + + @Override + public int expire() { + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + if (latestSnapshotId == null) { + // no snapshot, nothing to expire + return 0; + } + + Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); + if (earliestSnapshotId == null) { + return 0; + } + + Long latestChangelogId = snapshotManager.latestLongLivedChangelogId(); + if (latestChangelogId == null) { + return 0; + } + Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); + if (earliestChangelogId == null) { + return 0; + } + + // the min snapshot to retain from 'changelog.num-retained.max' + // (the maximum number of snapshots to retain) + long min = Math.max(latestSnapshotId - retainMax + 1, earliestChangelogId); + + // the max exclusive snapshot to expire until + // protected by 'changelog.num-retained.min' + // (the minimum number of completed snapshots to retain) + long maxExclusive = latestSnapshotId - retainMin + 1; + + // the snapshot being read by the consumer cannot be deleted + maxExclusive = + Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); + + // protected by 'snapshot.expire.limit' + // (the maximum number of snapshots allowed to expire at a time) + maxExclusive = Math.min(maxExclusive, earliestChangelogId + maxDeletes); + + // Only clean the snapshot in changelog dir + maxExclusive = Math.min(maxExclusive, latestChangelogId); + + for (long id = min; id <= maxExclusive; id++) { + if (snapshotManager.longLivedChangelogExists(id) + && olderThanMills <= snapshotManager.longLivedChangelog(id).timeMillis()) { + return expireUntil(earliestChangelogId, id); + } + } + return expireUntil(earliestChangelogId, maxExclusive); + } + + public int expireUntil(long earliestId, long endExclusiveId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); + } + + for (long id = earliestId; id < endExclusiveId; id++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ready to delete changelog files from snapshot #" + id); + } + Changelog changelog = snapshotManager.longLivedChangelog(id); + // delete changelog files + if (changelog.changelogManifestList() != null) { + snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); + snapshotDeletion.cleanUnusedManifestList( + changelog.changelogManifestList(), new HashSet<>()); + } + + snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id)); + } + + if (cleanEmptyDirectories) { + snapshotDeletion.cleanDataDirectories(); + } + writeEarliestHintFile(endExclusiveId); + return (int) (endExclusiveId - earliestId); + } + + private void writeEarliestHintFile(long earliest) { + try { + snapshotManager.commitLongLivedChangelogEarliestHint(earliest); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 00c40d0feca1..3ea9b31be5d9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; @@ -45,6 +46,8 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private final SnapshotDeletion snapshotDeletion; private final TagManager tagManager; private final boolean cleanEmptyDirectories; + /** Whether to clean the changelog or delta files. */ + private final boolean changelogDecoupled; private int retainMax = Integer.MAX_VALUE; private int retainMin = 1; @@ -55,13 +58,15 @@ public ExpireSnapshotsImpl( SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion, TagManager tagManager, - boolean cleanEmptyDirectories) { + boolean cleanEmptyDirectories, + boolean changelogDecoupled) { this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); this.snapshotDeletion = snapshotDeletion; this.tagManager = tagManager; this.cleanEmptyDirectories = cleanEmptyDirectories; + this.changelogDecoupled = changelogDecoupled; } @Override @@ -187,13 +192,15 @@ public int expireUntil(long earliestId, long endExclusiveId) { } // delete changelog files - for (long id = beginInclusiveId; id < endExclusiveId; id++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from snapshot #" + id); - } - Snapshot snapshot = snapshotManager.snapshot(id); - if (snapshot.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + if (!changelogDecoupled) { + for (long id = beginInclusiveId; id < endExclusiveId; id++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ready to delete changelog files from snapshot #" + id); + } + Snapshot snapshot = snapshotManager.snapshot(id); + if (snapshot.changelogManifestList() != null) { + snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + } } } @@ -215,9 +222,54 @@ public int expireUntil(long earliestId, long endExclusiveId) { } Snapshot snapshot = snapshotManager.snapshot(id); - snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); - - // delete snapshot last + if (changelogDecoupled) { + Changelog changelog; + if (snapshot.changelogManifestList() != null) { + changelog = + new Changelog( + id, + snapshot.schemaId(), + null, + null, + snapshot.changelogManifestList(), + null, + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.totalRecordCount(), + null, + snapshot.changelogRecordCount(), + snapshot.watermark(), + null); + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, false); + } else { + // no changelog + changelog = + new Changelog( + id, + snapshot.schemaId(), + null, + null, + null, + snapshot.indexManifest(), + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.changelogRecordCount(), + snapshot.totalRecordCount(), + snapshot.changelogRecordCount(), + snapshot.watermark(), + snapshot.statistics()); + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true); + } + commitChangelog(changelog); + } else { + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true); + } snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id)); } @@ -225,6 +277,15 @@ public int expireUntil(long earliestId, long endExclusiveId) { return (int) (endExclusiveId - beginInclusiveId); } + private void commitChangelog(Changelog changelog) { + try { + snapshotManager.commitChangelog(changelog, changelog.id()); + snapshotManager.commitLongLivedChangelogLatestHint(changelog.id()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private void writeEarliestHint(long earliest) { try { snapshotManager.commitEarliestHint(earliest); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index f0d52b641015..d07035f2651e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -156,4 +156,12 @@ default ExpireSnapshots newExpireSnapshots() { "Readonly Table %s does not support expireSnapshots.", this.getClass().getSimpleName())); } + + @Override + default ExpireSnapshots newExpireChangelog() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support expireChangelog.", + this.getClass().getSimpleName())); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index b641784552e2..f7dcfaf8a971 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestEntry; @@ -33,6 +34,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.SortedMap; @@ -68,6 +70,7 @@ public RollbackHelper( public void cleanLargerThan(Snapshot retainedSnapshot) { // clean data files List cleanedSnapshots = cleanSnapshotsDataFiles(retainedSnapshot); + List cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot); List cleanedTags = cleanTagsDataFiles(retainedSnapshot); // clean manifests @@ -78,6 +81,17 @@ public void cleanLargerThan(Snapshot retainedSnapshot) { snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); } + for (Changelog changelog : cleanedChangelogs) { + if (changelog.deltaManifestList() != null) { + snapshotDeletion.cleanUnusedManifestList( + changelog.deltaManifestList(), new HashSet<>()); + } + if (changelog.changelogManifestList() != null) { + snapshotDeletion.cleanUnusedManifestList( + changelog.changelogManifestList(), new HashSet<>()); + } + } + cleanedTags.removeAll(cleanedSnapshots); for (Snapshot snapshot : cleanedTags) { tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); @@ -121,6 +135,35 @@ private List cleanSnapshotsDataFiles(Snapshot retainedSnapshot) { return toBeCleaned; } + private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapshot) { + Long earliest = snapshotManager.earliestLongLivedChangelogId(); + Long latest = snapshotManager.latestLongLivedChangelogId(); + if (earliest == null || latest == null) { + return Collections.emptyList(); + } + + // delete snapshot files first, cannot be read now + // it is possible that some snapshots have been expired + List toBeCleaned = new ArrayList<>(); + long to = Math.max(earliest, retainedSnapshot.id() + 1); + for (long i = latest; i >= to; i--) { + toBeCleaned.add(snapshotManager.changelog(i)); + fileIO.deleteQuietly(snapshotManager.longLivedChangelogPath(i)); + } + + // delete data files of changelog + for (Changelog changelog : toBeCleaned) { + if (changelog.changelogManifestList() != null) { + snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); + } + } + + // delete directories + snapshotDeletion.cleanDataDirectories(); + + return toBeCleaned; + } + private List cleanTagsDataFiles(Snapshot retainedSnapshot) { SortedMap> tags = tagManager.tags(); if (tags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 64ab6d2ab0a7..3ed6a1990e5f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -100,6 +100,9 @@ public interface Table extends Serializable { @Experimental ExpireSnapshots newExpireSnapshots(); + @Experimental + ExpireSnapshots newExpireChangelog(); + // =============== Read & Write Operations ================== /** Returns a new read builder. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index b597ec70bdbf..463cd6fdfa63 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -119,7 +119,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { Optional consumer = consumerManager.consumer(consumerId); if (consumer.isPresent()) { return new ContinuousFromSnapshotStartingScanner( - snapshotManager, consumer.get().nextSnapshot()); + snapshotManager, + consumer.get().nextSnapshot(), + options.changelogProducer() != ChangelogProducer.NONE, + options.changelogLifecycleDecoupled()); } } @@ -145,7 +148,11 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { case FROM_TIMESTAMP: Long startupMillis = options.scanTimestampMills(); return isStreaming - ? new ContinuousFromTimestampStartingScanner(snapshotManager, startupMillis) + ? new ContinuousFromTimestampStartingScanner( + snapshotManager, + startupMillis, + options.changelogProducer() != ChangelogProducer.NONE, + options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis); case FROM_FILE_CREATION_TIME: Long fileCreationTimeMills = options.scanFileCreationTimeMills(); @@ -154,7 +161,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { if (options.scanSnapshotId() != null) { return isStreaming ? new ContinuousFromSnapshotStartingScanner( - snapshotManager, options.scanSnapshotId()) + snapshotManager, + options.scanSnapshotId(), + options.changelogProducer() != ChangelogProducer.NONE, + options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId()); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java index 5fc8866fd6c0..3e03dc0a1b81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java @@ -37,6 +37,7 @@ import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.NextSnapshotFetcher; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -56,6 +57,7 @@ public class InnerStreamTableScanImpl extends AbstractInnerTableScan private final SnapshotManager snapshotManager; private final boolean supportStreamingReadOverwrite; private final DefaultValueAssigner defaultValueAssigner; + private final NextSnapshotFetcher nextSnapshotProvider; private boolean initialized = false; private StartingScanner startingScanner; @@ -77,6 +79,11 @@ public InnerStreamTableScanImpl( this.snapshotManager = snapshotManager; this.supportStreamingReadOverwrite = supportStreamingReadOverwrite; this.defaultValueAssigner = defaultValueAssigner; + this.nextSnapshotProvider = + new NextSnapshotFetcher( + snapshotManager, + options.changelogLifecycleDecoupled(), + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE); } @Override @@ -164,24 +171,11 @@ private Plan nextPlan() { throw new EndOfScanException(); } - if (!snapshotManager.snapshotExists(nextSnapshotId)) { - Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); - if (earliestSnapshotId != null && earliestSnapshotId > nextSnapshotId) { - throw new OutOfRangeException( - String.format( - "The snapshot with id %d has expired. You can: " - + "1. increase the snapshot expiration time. " - + "2. use consumer-id to ensure that unconsumed snapshots will not be expired.", - nextSnapshotId)); - } - LOG.debug( - "Next snapshot id {} does not exist, wait for the snapshot generation.", - nextSnapshotId); + Snapshot snapshot = nextSnapshotProvider.getNextSnapshot(nextSnapshotId); + if (snapshot == null) { return SnapshotNotExistPlan.INSTANCE; } - Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId); - if (boundedChecker.shouldEndInput(snapshot)) { throw new EndOfScanException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java index c966de58c46f..38c01f35d245 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java @@ -27,19 +27,42 @@ */ public class ContinuousFromSnapshotStartingScanner extends AbstractStartingScanner { - public ContinuousFromSnapshotStartingScanner(SnapshotManager snapshotManager, long snapshotId) { + private final boolean changelogAsFollowup; + private final boolean changelogDecoupled; + + public ContinuousFromSnapshotStartingScanner( + SnapshotManager snapshotManager, + long snapshotId, + boolean changelogAsFollowup, + boolean changelogDecoupled) { super(snapshotManager); this.startingSnapshotId = snapshotId; + this.changelogDecoupled = changelogDecoupled; + this.changelogAsFollowup = changelogAsFollowup; } @Override public Result scan(SnapshotReader snapshotReader) { - Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); - if (earliestSnapshotId == null) { + Long earliestId = getEarliestId(); + if (earliestId == null) { return new NoSnapshot(); } // We should return the specified snapshot as next snapshot to indicate to scan delta data // from it. If the snapshotId < earliestSnapshotId, start from the earliest. - return new NextSnapshot(Math.max(startingSnapshotId, earliestSnapshotId)); + return new NextSnapshot(Math.max(startingSnapshotId, earliestId)); + } + + private Long getEarliestId() { + Long earliestId; + if (changelogAsFollowup && changelogDecoupled) { + Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); + earliestId = + earliestChangelogId == null + ? snapshotManager.earliestSnapshotId() + : earliestChangelogId; + } else { + earliestId = snapshotManager.earliestSnapshotId(); + } + return earliestId; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java index 6113773ffe5a..7e39e0859781 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java @@ -34,12 +34,18 @@ public class ContinuousFromTimestampStartingScanner extends AbstractStartingScan LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class); private final long startupMillis; + private final boolean startFromChangelog; public ContinuousFromTimestampStartingScanner( - SnapshotManager snapshotManager, long startupMillis) { + SnapshotManager snapshotManager, + long startupMillis, + boolean changelogAsFollowup, + boolean changelogDecoupled) { super(snapshotManager); this.startupMillis = startupMillis; - this.startingSnapshotId = this.snapshotManager.earlierThanTimeMills(this.startupMillis); + this.startFromChangelog = changelogAsFollowup && changelogDecoupled; + this.startingSnapshotId = + this.snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); } @Override @@ -53,7 +59,8 @@ public StartingContext startingContext() { @Override public Result scan(SnapshotReader snapshotReader) { - Long startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis); + Long startingSnapshotId = + snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); if (startingSnapshotId == null) { LOG.debug("There is currently no snapshot. Waiting for snapshot generation."); return new NoSnapshot(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java new file mode 100644 index 000000000000..87f1fb84984c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.source.OutOfRangeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** Fetcher for getting the next snapshot by snapshot id. */ +public class NextSnapshotFetcher { + + public static final Logger LOG = LoggerFactory.getLogger(NextSnapshotFetcher.class); + private final SnapshotManager snapshotManager; + private final boolean changelogDecoupled; + // Only support changelog as follow-up now. + private final boolean changelogAsFollowup; + + public NextSnapshotFetcher( + SnapshotManager snapshotManager, + boolean changelogDecoupled, + boolean changelogAsFollowup) { + this.snapshotManager = snapshotManager; + this.changelogDecoupled = changelogDecoupled; + this.changelogAsFollowup = changelogAsFollowup; + } + + @Nullable + public Snapshot getNextSnapshot(long nextSnapshotId) { + if (snapshotManager.snapshotExists(nextSnapshotId)) { + return snapshotManager.snapshot(nextSnapshotId); + } + + Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); + // No snapshot now + if (earliestSnapshotId == null || earliestSnapshotId <= nextSnapshotId) { + LOG.debug( + "Next snapshot id {} does not exist, wait for the snapshot generation.", + nextSnapshotId); + return null; + } + + if (!changelogAsFollowup + || !changelogDecoupled + || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { + throw new OutOfRangeException( + String.format( + "The snapshot with id %d has expired. You can: " + + "1. increase the snapshot or changelog expiration time. " + + "2. use consumer-id to ensure that unconsumed snapshots will not be expired.", + nextSnapshotId)); + } + return snapshotManager.changelog(nextSnapshotId); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index b330fc30389f..834836dcba2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -18,6 +18,7 @@ package org.apache.paimon.utils; +import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -40,6 +41,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ public class SnapshotManager implements Serializable { private static final long serialVersionUID = 1L; private static final String SNAPSHOT_PREFIX = "snapshot-"; + private static final String CHANGELOG_PREFIX = "changelog-"; public static final String EARLIEST = "EARLIEST"; public static final String LATEST = "LATEST"; private static final int READ_HINT_RETRY_NUM = 3; @@ -78,6 +81,14 @@ public Path snapshotDirectory() { return new Path(tablePath + "/snapshot"); } + public Path changelogDirectory() { + return new Path(tablePath + "/changelog"); + } + + public Path longLivedChangelogPath(long snapshotId) { + return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId); + } + public Path snapshotPath(long snapshotId) { return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } @@ -107,6 +118,15 @@ public Snapshot snapshot(long snapshotId) { return snapshot(DEFAULT_MAIN_BRANCH, snapshotId); } + public Changelog changelog(long snapshotId) { + Path changelogPath = longLivedChangelogPath(snapshotId); + return Changelog.fromPath(fileIO, changelogPath); + } + + public Changelog longLivedChangelog(long snapshotId) { + return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); + } + public Snapshot snapshot(String branchName, long snapshotId) { Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); @@ -123,6 +143,17 @@ public boolean snapshotExists(long snapshotId) { } } + public boolean longLivedChangelogExists(long snapshotId) { + Path path = longLivedChangelogPath(snapshotId); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + "Failed to determine if changelog #" + snapshotId + " exists in path " + path, + e); + } + } + public @Nullable Snapshot latestSnapshot() { return latestSnapshot(DEFAULT_MAIN_BRANCH); } @@ -138,7 +169,7 @@ public boolean snapshotExists(long snapshotId) { public @Nullable Long latestSnapshotId(String branchName) { try { - return findLatest(branchName); + return findLatest(snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } @@ -153,9 +184,31 @@ public boolean snapshotExists(long snapshotId) { return earliestSnapshotId(DEFAULT_MAIN_BRANCH); } + public @Nullable Long earliestLongLivedChangelogId() { + try { + return findEarliest( + changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find earliest changelog id", e); + } + } + + public @Nullable Long latestLongLivedChangelogId() { + try { + return findLatest(changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find latest changelog id", e); + } + } + + public @Nullable Long latestChangelogId() { + return latestSnapshotId(); + } + public @Nullable Long earliestSnapshotId(String branchName) { try { - return findEarliest(branchName); + return findEarliest( + snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find earliest snapshot id", e); } @@ -180,25 +233,40 @@ public boolean snapshotExists(long snapshotId) { return latestId; } + private Snapshot changelogOrSnapshot(long snapshotId) { + if (longLivedChangelogExists(snapshotId)) { + return changelog(snapshotId); + } else { + return snapshot(snapshotId); + } + } + /** * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be * returned if all snapshots are equal to or later than the timestamp mills. */ - public @Nullable Long earlierThanTimeMills(long timestampMills) { - Long earliest = earliestSnapshotId(); + public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { + Long earliestSnapshot = earliestSnapshotId(); + Long earliest; + if (startFromChangelog) { + Long earliestChangelog = earliestLongLivedChangelogId(); + earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog; + } else { + earliest = earliestSnapshot; + } Long latest = latestSnapshotId(); if (earliest == null || latest == null) { return null; } - if (snapshot(earliest).timeMillis() >= timestampMills) { + if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) { return earliest - 1; } while (earliest < latest) { long mid = (earliest + latest + 1) / 2; - if (snapshot(mid).timeMillis() < timestampMills) { + if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { earliest = mid; } else { latest = mid - 1; @@ -351,6 +419,10 @@ public List findSnapshotsForIdentifiers( return matchedSnapshots; } + public void commitChangelog(Changelog changelog, long id) throws IOException { + fileIO.writeFileUtf8(longLivedChangelogPath(id), changelog.toJson()); + } + /** * Traversal snapshots from latest to earliest safely, this is applied on the writer side * because the committer may delete obsolete snapshots, which may cause the writer to encounter @@ -393,46 +465,45 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return null; } - private @Nullable Long findLatest(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); - if (!fileIO.exists(snapshotDir)) { + private @Nullable Long findLatest(Path dir, String prefix, Function file) + throws IOException { + if (!fileIO.exists(dir)) { return null; } - Long snapshotId = readHint(LATEST, branchName); + Long snapshotId = readHint(LATEST, dir); if (snapshotId != null) { long nextSnapshot = snapshotId + 1; // it is the latest only there is no next one - if (!snapshotExists(nextSnapshot)) { + if (!fileIO.exists(file.apply(nextSnapshot))) { return snapshotId; } } - return findByListFiles(Math::max, branchName); + return findByListFiles(Math::max, dir, prefix); } - private @Nullable Long findEarliest(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); - if (!fileIO.exists(snapshotDir)) { + private @Nullable Long findEarliest(Path dir, String prefix, Function file) + throws IOException { + if (!fileIO.exists(dir)) { return null; } - Long snapshotId = readHint(EARLIEST, branchName); + Long snapshotId = readHint(EARLIEST, dir); // null and it is the earliest only it exists - if (snapshotId != null && snapshotExists(snapshotId)) { + if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) { return snapshotId; } - return findByListFiles(Math::min, branchName); + return findByListFiles(Math::min, dir, prefix); } public Long readHint(String fileName) { - return readHint(fileName, DEFAULT_MAIN_BRANCH); + return readHint(fileName, snapshotDirByBranch(DEFAULT_MAIN_BRANCH)); } - public Long readHint(String fileName, String branchName) { - Path snapshotDir = snapshotDirByBranch(branchName); - Path path = new Path(snapshotDir, fileName); + public Long readHint(String fileName, Path dir) { + Path path = new Path(dir, fileName); int retryNumber = 0; while (retryNumber++ < READ_HINT_RETRY_NUM) { try { @@ -449,12 +520,9 @@ public Long readHint(String fileName, String branchName) { return null; } - private Long findByListFiles(BinaryOperator reducer, String branchName) + private Long findByListFiles(BinaryOperator reducer, Path dir, String prefix) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); - return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX) - .reduce(reducer) - .orElse(null); + return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); } public void commitLatestHint(long snapshotId) throws IOException { @@ -462,7 +530,15 @@ public void commitLatestHint(long snapshotId) throws IOException { } public void commitLatestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, LATEST, branchName); + commitHint(snapshotId, LATEST, snapshotDirByBranch(branchName)); + } + + public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { + commitHint(snapshotId, LATEST, changelogDirectory()); + } + + public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException { + commitHint(snapshotId, EARLIEST, changelogDirectory()); } public void commitEarliestHint(long snapshotId) throws IOException { @@ -470,13 +546,11 @@ public void commitEarliestHint(long snapshotId) throws IOException { } public void commitEarliestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, EARLIEST, branchName); + commitHint(snapshotId, EARLIEST, snapshotDirByBranch(branchName)); } - private void commitHint(long snapshotId, String fileName, String branchName) - throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); - Path hintFile = new Path(snapshotDir, fileName); + private void commitHint(long snapshotId, String fileName, Path dir) throws IOException { + Path hintFile = new Path(dir, fileName); fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 43e0297fd921..b20f89b940cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -46,6 +46,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.ExpireChangelogImpl; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -150,25 +151,55 @@ public FileStoreCommitImpl newCommit() { return super.newCommit(commitUser); } + public ExpireSnapshots newExpire( + int numRetainedMin, + int numRetainedMax, + long millisRetained, + boolean snapshotExpireCleanEmptyDirectories) { + return newExpire( + numRetainedMin, + numRetainedMax, + millisRetained, + snapshotExpireCleanEmptyDirectories, + false); + } + public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long millisRetained) { - return newExpire(numRetainedMin, numRetainedMax, millisRetained, true); + return newExpire(numRetainedMin, numRetainedMax, millisRetained, true, false); } public ExpireSnapshots newExpire( int numRetainedMin, int numRetainedMax, long millisRetained, - boolean snapshotExpireCleanEmptyDirectories) { + boolean snapshotExpireCleanEmptyDirectories, + boolean changelogDecoupled) { return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), new TagManager(fileIO, options.path()), - snapshotExpireCleanEmptyDirectories) + snapshotExpireCleanEmptyDirectories, + changelogDecoupled) .retainMax(numRetainedMax) .retainMin(numRetainedMin) .olderThanMills(System.currentTimeMillis() - millisRetained); } + public ExpireSnapshots newChangelogExpire( + int numRetainedMin, + int numRetainedMax, + long millisRetained, + boolean snapshotExpireCleanEmptyDirectories) { + return new ExpireChangelogImpl( + snapshotManager(), + new TagManager(fileIO, options.path()), + newSnapshotDeletion(), + snapshotExpireCleanEmptyDirectories) + .retainMin(numRetainedMin) + .retainMax(numRetainedMax) + .olderThanMills(System.currentTimeMillis() - millisRetained); + } + public List commitData( List kvs, Function partitionCalculator, @@ -479,7 +510,20 @@ public void assertCleaned() throws IOException { fileIO.delete(latest, false); assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue(); } - actualFiles.remove(latest); + Path changelogDir = snapshotManager.changelogDirectory(); + Path earliestChangelog = new Path(changelogDir, SnapshotManager.EARLIEST); + Path latestChangelog = new Path(changelogDir, SnapshotManager.LATEST); + + if (actualFiles.remove(earliestChangelog)) { + long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST, changelogDir); + fileIO.delete(earliest, false); + assertThat(earliestId <= snapshotManager.earliestLongLivedChangelogId()).isTrue(); + } + if (actualFiles.remove(latestChangelog)) { + long latestId = snapshotManager.readHint(SnapshotManager.LATEST, changelogDir); + fileIO.delete(latest, false); + assertThat(latestId <= snapshotManager.latestLongLivedChangelogId()).isTrue(); + } // for easier debugging String expectedString = @@ -507,7 +551,8 @@ private Set getFilesInUse() { long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID; for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) { - if (!snapshotManager.snapshotExists(id)) { + if (!snapshotManager.snapshotExists(id) + && !snapshotManager.longLivedChangelogExists(id)) { firstInUseSnapshotId = id + 1; break; } @@ -516,6 +561,7 @@ private Set getFilesInUse() { for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) { result.addAll(getFilesInUse(id)); } + return result; } @@ -538,6 +584,31 @@ public static Set getFilesInUse( ManifestList manifestList) { Set result = new HashSet<>(); + if (snapshotManager.snapshotExists(snapshotId)) { + result.addAll( + getSnapshotFileInUse( + snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList)); + } else if (snapshotManager.longLivedChangelogExists(snapshotId)) { + result.addAll( + getChangelogFileInUse( + snapshotId, snapshotManager, scan, fileIO, pathFactory, manifestList)); + } else { + throw new RuntimeException( + String.format("The snapshot %s does not exist.", snapshotId)); + } + + return result; + } + + private static Set getSnapshotFileInUse( + long snapshotId, + SnapshotManager snapshotManager, + FileStoreScan scan, + FileIO fileIO, + FileStorePathFactory pathFactory, + ManifestList manifestList) { + Set result = new HashSet<>(); + Path snapshotPath = snapshotManager.snapshotPath(snapshotId); Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath); @@ -567,6 +638,43 @@ public static Set getFilesInUse( return result; } + private static Set getChangelogFileInUse( + long changelogId, + SnapshotManager snapshotManager, + FileStoreScan scan, + FileIO fileIO, + FileStorePathFactory pathFactory, + ManifestList manifestList) { + Set result = new HashSet<>(); + + Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId); + Changelog changelog = Changelog.fromPath(fileIO, changelogPath); + + // changelog file + result.add(changelogPath); + + // manifest lists + if (changelog.changelogManifestList() != null) { + result.add(pathFactory.toManifestListPath(changelog.changelogManifestList())); + } + + // manifests + List manifests = new ArrayList<>(); + manifests.addAll(changelog.changelogManifests(manifestList)); + + manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + + // data file + List entries = scan.withManifestList(manifests).plan().files(); + for (ManifestEntry entry : entries) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + return result; + } + /** Builder of {@link TestFileStore}. */ public static class Builder { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 2343aa462546..c8c7be7e12e3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -390,6 +390,41 @@ public void testExpireWithUpgradedFile() throws Exception { store.assertCleaned(); } + @Test + public void testChangelogOutLivedSnapshot() throws Exception { + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(10, allData, snapshotPositions); + ExpireSnapshots snapshot = store.newExpire(1, 2, Long.MAX_VALUE, true, true); + ExpireSnapshots changelog = store.newChangelogExpire(1, 3, Long.MAX_VALUE, true); + // expire twice to check for idempotence + snapshot.expire(); + snapshot.expire(); + + int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); + int earliestSnapshotId = snapshotManager.earliestSnapshotId().intValue(); + int latestLongLivedChangelogId = snapshotManager.latestLongLivedChangelogId().intValue(); + int earliestLongLivedChangelogId = + snapshotManager.earliestLongLivedChangelogId().intValue(); + + // 2 snapshot in /snapshot + assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(1); + assertThat(earliestLongLivedChangelogId).isEqualTo(1); + // The changelog id and snapshot id is continuous + assertThat(earliestSnapshotId - latestLongLivedChangelogId).isEqualTo(1); + + changelog.expire(); + changelog.expire(); + + assertThat(snapshotManager.latestSnapshotId().intValue()).isEqualTo(latestSnapshotId); + assertThat(snapshotManager.earliestSnapshotId().intValue()).isEqualTo(earliestSnapshotId); + assertThat(snapshotManager.latestLongLivedChangelogId()) + .isEqualTo(snapshotManager.earliestSnapshotId() - 1); + assertThat(snapshotManager.earliestLongLivedChangelogId()) + .isEqualTo(snapshotManager.earliestSnapshotId() - 1); + store.assertCleaned(); + } + private TestFileStore createStore() { ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 94c26c455bd3..322a353f2764 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -207,6 +207,7 @@ public void testNoPartitions() throws Exception { // check after expiring store.newExpire(1, 1, Long.MAX_VALUE).expire(); + assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0)); assertPathExists(fileIO, pathFactory.bucketPath(partition, 1)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index fabd61639207..6123a9e99710 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -101,6 +101,8 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX; +import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM; import static org.apache.paimon.CoreOptions.CONSUMER_IGNORE_PROGRESS; import static org.apache.paimon.CoreOptions.ExpireExecutionMode; @@ -843,6 +845,8 @@ public void testRollbackToTag(boolean expire) throws Exception { Options options = new Options(); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5); + options.set(CHANGELOG_NUM_RETAINED_MIN, 5); + options.set(CHANGELOG_NUM_RETAINED_MAX, 5); table.copy(options.toMap()).newCommit("").expireSnapshots(); } @@ -1141,6 +1145,8 @@ public void testAsyncExpireExecutionMode() throws Exception { options.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(), ExpireExecutionMode.ASYNC.toString()); options.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1"); options.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1"); + options.put(CHANGELOG_NUM_RETAINED_MIN.key(), "1"); + options.put(CHANGELOG_NUM_RETAINED_MAX.key(), "1"); options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2"); TableCommitImpl commit = table.copy(options).newCommit(commitUser); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java index 048911152278..7608be614df7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java @@ -195,10 +195,10 @@ public void testStartFromSnapshot() throws Exception { @Test public void testTimeTravelFromExpiredSnapshot() throws Exception { Map properties = new HashMap<>(); - // retaine 2 snapshots + // retain 2 snapshots properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2"); properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "2"); - // specify consume from a expired snapshot + // specify consume from an expired snapshot properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1"); initializeTable(StartupMode.FROM_SNAPSHOT, properties); initializeTestData(); // initialize 3 commits, expired snapshot 1 diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 8152fe2ca9f4..5f565339f9d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -69,7 +69,7 @@ public void testPlan() throws Exception { @Test public void testPushDownLimit() throws Exception { - createAppenOnlyTable(); + createAppendOnlyTable(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java index e9364eeffe38..4223e0c01a49 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java @@ -18,13 +18,20 @@ package org.apache.paimon.table.source.snapshot; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.Test; +import java.util.UUID; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link ContinuousFromTimestampStartingScanner}. */ @@ -58,7 +65,8 @@ public void testScan() throws Exception { long timestamp = snapshotManager.snapshot(3).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp); + new ContinuousFromTimestampStartingScanner( + snapshotManager, timestamp, false, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); @@ -72,7 +80,7 @@ public void testNoSnapshot() { SnapshotManager snapshotManager = table.snapshotManager(); ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, System.currentTimeMillis()); + snapshotManager, System.currentTimeMillis(), false, false); assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class); } @@ -92,7 +100,8 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { long timestamp = snapshotManager.snapshot(1).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp); + new ContinuousFromTimestampStartingScanner( + snapshotManager, timestamp, false, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); // next snapshot @@ -101,4 +110,70 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { write.close(); commit.close(); } + + @Test + public void testScanFromChangelog() throws Exception { + Options options = new Options(); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); + FileStoreTable table = + createFileStoreTable( + true, + options, + new Path( + TraceableFileIO.SCHEME + + "://" + + tempDir.toString() + + "/" + + UUID.randomUUID())); + SnapshotManager snapshotManager = table.snapshotManager(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 20, 200L)); + write.write(rowData(1, 40, 400L)); + commit.commit(0, write.prepareCommit(true, 0)); + + Thread.sleep(50); + + write.write(rowData(1, 10, 101L)); + write.write(rowData(1, 30, 300L)); + write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L)); + commit.commit(1, write.prepareCommit(true, 1)); + + // wait for a little while + Thread.sleep(50); + + write.write(rowData(1, 10, 102L)); + write.write(rowData(1, 20, 201L)); + commit.commit(2, write.prepareCommit(true, 2)); + + assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3); + assertThat(snapshotManager.earliestLongLivedChangelogId()).isEqualTo(1); + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(2); + + ContinuousFromTimestampStartingScanner scanner = + new ContinuousFromTimestampStartingScanner( + snapshotManager, snapshotManager.snapshot(3).timeMillis(), true, true); + StartingScanner.NextSnapshot result = + (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); + assertThat(result.nextSnapshotId()).isEqualTo(3); + scanner = + new ContinuousFromTimestampStartingScanner( + snapshotManager, snapshotManager.snapshot(2).timeMillis(), true, true); + + assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) + .isEqualTo(2); + + scanner = + new ContinuousFromTimestampStartingScanner( + snapshotManager, snapshotManager.changelog(1).timeMillis(), true, true); + assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) + .isEqualTo(1); + + write.close(); + commit.close(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java index 3e37ee0e450a..37c5a6d7d56f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java @@ -85,7 +85,7 @@ public void before() throws Exception { snapshotReader = table.newSnapshotReader(); } - protected void createAppenOnlyTable() throws Exception { + protected void createAppendOnlyTable() throws Exception { tempDir = Files.createTempDirectory("junit"); tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString()); fileIO = FileIOFinder.find(tablePath); @@ -135,19 +135,19 @@ protected String rowDataToString(InternalRow rowData) { } protected FileStoreTable createFileStoreTable() throws Exception { - return createFileStoreTable(true, new Options()); + return createFileStoreTable(true, new Options(), tablePath); } protected FileStoreTable createFileStoreTable(Options conf) throws Exception { - return createFileStoreTable(true, conf); + return createFileStoreTable(true, conf, tablePath); } protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys) throws Exception { - return createFileStoreTable(withPrimaryKeys, new Options()); + return createFileStoreTable(withPrimaryKeys, new Options(), tablePath); } - protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys, Options conf) - throws Exception { + protected FileStoreTable createFileStoreTable( + boolean withPrimaryKeys, Options conf, Path tablePath) throws Exception { SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); List primaryKeys = new ArrayList<>(); if (withPrimaryKeys) { diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 7294e3810f3a..09fd61bde8c2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -18,11 +18,13 @@ package org.apache.paimon.utils; +import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -84,7 +86,7 @@ public void testEarlierThanTimeMillis() throws IOException { // pick a random time equal to one of the snapshots time = millis.get(random.nextInt(numSnapshots)); } - Long actual = snapshotManager.earlierThanTimeMills(time); + Long actual = snapshotManager.earlierThanTimeMills(time, false); if (millis.get(numSnapshots - 1) < time) { assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1); @@ -141,6 +143,26 @@ private Snapshot createSnapshotWithMillis(long id, long millis) { null); } + private Changelog createChangelogWithMillis(long id, long millis) { + return new Changelog( + id, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + millis, + null, + null, + null, + null, + null, + null); + } + @Test public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException { FileIO localFileIO = LocalFileIO.create(); @@ -234,4 +256,29 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru assertThat(exception.get()).hasMessageContaining("Fails to read snapshot from path"); } + + @Test + public void testLongLivedChangelog() throws Exception { + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new SnapshotManager(localFileIO, new Path(tempDir.toString())); + long millis = 1L; + for (long i = 1; i <= 5; i++) { + Changelog changelog = createChangelogWithMillis(i, millis + i * 1000); + localFileIO.writeFileUtf8( + snapshotManager.longLivedChangelogPath(i), changelog.toJson()); + } + + for (long i = 6; i <= 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); + localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + Assertions.assertThat(snapshotManager.earliestLongLivedChangelogId()).isEqualTo(1); + Assertions.assertThat(snapshotManager.latestChangelogId()).isEqualTo(10); + Assertions.assertThat(snapshotManager.latestLongLivedChangelogId()).isEqualTo(5); + Assertions.assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(6); + Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10); + Assertions.assertThat(snapshotManager.changelog(1)).isNotNull(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index d5db6a58819c..66a154ba4448 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -594,4 +594,46 @@ public void testScanFromOldSchema() throws Exception { DateTimeUtils.toInternal(timestamp, 0), 0))); assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c")); } + + @Test + public void testScanFromChangelog() throws Exception { + batchSql( + "CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)\n" + + " WITH ('changelog-producer'='input', 'bucket' = '1', \n" + + " 'snapshot.num-retained.max' = '2',\n" + + " 'snapshot.num-retained.min' = '1',\n" + + " 'changelog.num-retained.max' = '3',\n" + + " 'changelog.num-retained.min' = '1'\n" + + ")"); + + batchSql("INSERT INTO T3 VALUES ('1', '2', '3')"); + batchSql("INSERT INTO T3 VALUES ('4', '5', '6')"); + batchSql("INSERT INTO T3 VALUES ('7', '8', '9')"); + BlockingIterator iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0)); + assertThat(iterator.collect(3)) + .containsExactlyInAnyOrder( + Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9")); + iterator.close(); + + batchSql("INSERT INTO T3 VALUES ('10', '11', '12')"); + + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0)); + assertThat(iterator.collect(3)) + .containsExactlyInAnyOrder( + Row.of("4", "5", "6"), Row.of("7", "8", "9"), Row.of("10", "11", "12")); + iterator.close(); + + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 4)); + assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10", "11", "12")); + iterator.close(); + } }