From 3347c1d764b012d01ba33a388fe046934d333baa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 15 Oct 2024 15:05:51 +0800 Subject: [PATCH 01/10] [flink] Add ManifestCompactProcedure to enable compact manifest manually --- .../paimon/operation/FileStoreCommit.java | 2 + .../paimon/operation/FileStoreCommitImpl.java | 133 ++++++++++++------ .../paimon/operation/FileStoreCommitTest.java | 33 +++++ .../procedure/ManifestCompactProcedure.java | 60 ++++++++ .../org.apache.paimon.factories.Factory | 1 + .../ManifestCompactProcedureITCase.java | 133 ++++++++++++++++++ 6 files changed, 321 insertions(+), 41 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index e15225793076..5f11bd045c9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -74,6 +74,8 @@ void overwrite( void truncateTable(long commitIdentifier); + void compactManifest(); + /** Abort an unsuccessful commit. The data files will be deleted. */ void abort(List commitMessages); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 34f928cc264d..3b8cd4be7384 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1025,47 +1025,7 @@ CommitResult tryCommitOnce( e); } - boolean success; - try { - Callable callable = - () -> { - boolean committed = - fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); - if (committed) { - snapshotManager.commitLatestHint(newSnapshotId); - } - return committed; - }; - if (lock != null) { - success = - lock.runWithLock( - () -> - // fs.rename may not returns false if target file - // already exists, or even not atomic - // as we're relying on external locking, we can first - // check if file exist then rename to work around this - // case - !fileIO.exists(newSnapshotPath) && callable.call()); - } else { - success = callable.call(); - } - } catch (Throwable e) { - // exception when performing the atomic rename, - // we cannot clean up because we can't determine the success - throw new RuntimeException( - String.format( - "Exception occurs when committing snapshot #%d (path %s) by user %s " - + "with identifier %s and kind %s. " - + "Cannot clean up because we can't determine the success.", - newSnapshotId, - newSnapshotPath, - commitUser, - identifier, - commitKind.name()), - e); - } - - if (success) { + if (commitSnapshotImpl(newSnapshot, newSnapshotPath)) { if (LOG.isDebugEnabled()) { LOG.debug( String.format( @@ -1104,6 +1064,97 @@ CommitResult tryCommitOnce( baseDataFiles); } + public void compactManifest() { + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + + if (latestSnapshot == null) { + return; + } + + List mergeBeforeManifests = + manifestList.readDataManifests(latestSnapshot); + + List mergeAfterManifests = + ManifestFileMerger.merge( + mergeBeforeManifests, + manifestFile, + manifestTargetSize.getBytes(), + 1, + manifestFullCompactionSize.getBytes(), + partitionType, + manifestReadParallelism); + + String baseManifestList = manifestList.write(mergeAfterManifests); + String deltaManifestList = manifestList.write(Collections.emptyList()); + + // prepare snapshot file + Snapshot newSnapshot = + new Snapshot( + latestSnapshot.id() + 1, + latestSnapshot.schemaId(), + baseManifestList, + deltaManifestList, + null, + latestSnapshot.indexManifest(), + commitUser, + Long.MAX_VALUE, + Snapshot.CommitKind.COMPACT, + System.currentTimeMillis(), + latestSnapshot.logOffsets(), + latestSnapshot.totalRecordCount(), + 0L, + 0L, + latestSnapshot.watermark(), + latestSnapshot.statistics()); + + Path newSnapshotPath = + branchName.equals(DEFAULT_MAIN_BRANCH) + ? snapshotManager.snapshotPath(newSnapshot.id()) + : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id()); + + commitSnapshotImpl(newSnapshot, newSnapshotPath); + } + + private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) { + try { + Callable callable = + () -> { + boolean committed = + fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); + if (committed) { + snapshotManager.commitLatestHint(newSnapshot.id()); + } + return committed; + }; + if (lock != null) { + return lock.runWithLock( + () -> + // fs.rename may not returns false if target file + // already exists, or even not atomic + // as we're relying on external locking, we can first + // check if file exist then rename to work around this + // case + !fileIO.exists(newSnapshotPath) && callable.call()); + } else { + return callable.call(); + } + } catch (Throwable e) { + // exception when performing the atomic rename, + // we cannot clean up because we can't determine the success + throw new RuntimeException( + String.format( + "Exception occurs when committing snapshot #%d (path %s) by user %s " + + "with identifier %s and kind %s. " + + "Cannot clean up because we can't determine the success.", + newSnapshot.id(), + newSnapshotPath, + commitUser, + newSnapshot.commitIdentifier(), + newSnapshot.commitKind().name()), + e); + } + } + private List readIncrementalChanges( Snapshot from, Snapshot to, List changedPartitions) { List entries = new ArrayList<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 77ee6b8e7623..63b1f1dafc34 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -910,6 +911,38 @@ public void testDVIndexFiles() throws Exception { assertThat(dvs.get("f2").isDeleted(3)).isTrue(); } + @Test + public void testManifestCompact() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + // commit 1 + Snapshot snapshot1 = + store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0); + // commit 2 + Snapshot snapshot2 = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + // commit 3 + Snapshot snapshot3 = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + + long deleteNum = + store.manifestListFactory().create().readDataManifests(snapshot3).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum(); + assertThat(deleteNum).isEqualTo(2); + store.newCommit().compactManifest(); + Snapshot latest = store.snapshotManager().latestSnapshot(); + assertThat( + store.manifestListFactory().create().readDataManifests(latest).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum()) + .isEqualTo(0); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java new file mode 100644 index 000000000000..3ee179b7dcc4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.Collections; + +/** Compact manifest file to reduce deleted manifest entries. */ +public class ManifestCompactProcedure extends ProcedureBase { + + private static final String commitUser = "Compact-Procedure-Committer"; + + @Override + public String identifier() { + return "manifest_compact"; + } + + @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) + public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { + + FileStoreTable table = + (FileStoreTable) + table(tableId) + .copy( + Collections.singletonMap( + CoreOptions.COMMIT_USER_PREFIX.key(), commitUser)); + + try (FileStoreCommit commit = + table.store() + .newCommit(table.coreOptions().createCommitUser()) + .ignoreEmptyCommit(false)) { + commit.compactManifest(); + } + return new String[] {"success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 5014b9c3c36d..039d996338c8 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -74,3 +74,4 @@ org.apache.paimon.flink.procedure.RenameTagProcedure org.apache.paimon.flink.procedure.FastForwardProcedure org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure +org.apache.paimon.flink.procedure.ManifestCompactProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java new file mode 100644 index 000000000000..02d4fe3c2c06 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.CatalogITCaseBase; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Objects; + +/** IT Case for {@link ManifestCompactProcedure}. */ +public class ManifestCompactProcedureITCase extends CatalogITCaseBase { + + @Test + public void testManifestCompactProcedure() { + sql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'file.format' = 'parquet'," + + " 'manifest.full-compaction-threshold-size' = '10000 T'," + + " 'bucket' = '-1'" + + ")"); + + sql( + "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '101', 15, '20221208'), (4, '1001', 16, '20221208'), (5, '10001', 15, '20221209')"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(9L); + + Assertions.assertThat( + Objects.requireNonNull( + sql("CALL sys.manifest_compact(`table` => 'default.T')") + .get(0) + .getField(0)) + .toString()) + .isEqualTo("success"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(0L); + + Assertions.assertThat(sql("SELECT * FROM T ORDER BY k").toString()) + .isEqualTo( + "[+I[1, 101, 15, 20221208], +I[4, 1001, 16, 20221208], +I[5, 10001, 15, 20221209]]"); + } + + @Test + public void testManifestCompactProcedureWithBranch() { + sql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'manifest.full-compaction-threshold-size' = '10000 T'," + + " 'bucket' = '-1'" + + ")"); + + sql( + "INSERT INTO `T` VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql("CALL sys.create_tag('default.T', 'tag1', 1)"); + + sql("call sys.create_branch('default.T', 'branch1', 'tag1')"); + + sql( + "INSERT OVERWRITE T$branch_branch1 VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T$branch_branch1 VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T$branch_branch1 VALUES (1, '101', 15, '20221208'), (4, '1001', 16, '20221208'), (5, '10001', 15, '20221209')"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$branch_branch1$manifests") + .get(0) + .getField(0)) + .isEqualTo(9L); + + Assertions.assertThat( + Objects.requireNonNull( + sql("CALL sys.manifest_compact(`table` => 'default.T$branch_branch1')") + .get(0) + .getField(0)) + .toString()) + .isEqualTo("success"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$branch_branch1$manifests") + .get(0) + .getField(0)) + .isEqualTo(0L); + + Assertions.assertThat(sql("SELECT * FROM T$branch_branch1 ORDER BY k").toString()) + .isEqualTo( + "[+I[1, 101, 15, 20221208], +I[4, 1001, 16, 20221208], +I[5, 10001, 15, 20221209]]"); + } +} From 37dad0cf49893a8329e8c6c1679b3da191f47888 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 15 Oct 2024 16:46:09 +0800 Subject: [PATCH 02/10] fix comment --- .../paimon/operation/FileStoreCommitImpl.java | 55 +++++++++++++++---- ...ure.java => CompactManifestProcedure.java} | 6 +- .../org.apache.paimon.factories.Factory | 2 +- ...va => CompactManifestProcedureITCase.java} | 8 +-- 4 files changed, 52 insertions(+), 19 deletions(-) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/{ManifestCompactProcedure.java => CompactManifestProcedure.java} (92%) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/{ManifestCompactProcedureITCase.java => CompactManifestProcedureITCase.java} (94%) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3b8cd4be7384..215d205a3ebe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1065,6 +1065,11 @@ CommitResult tryCommitOnce( } public void compactManifest() { + compactManifest(null); + } + + private void compactManifest( + @Nullable Pair, List> lastResult) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); if (latestSnapshot == null) { @@ -1073,16 +1078,42 @@ public void compactManifest() { List mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot); - - List mergeAfterManifests = - ManifestFileMerger.merge( - mergeBeforeManifests, - manifestFile, - manifestTargetSize.getBytes(), - 1, - manifestFullCompactionSize.getBytes(), - partitionType, - manifestReadParallelism); + List mergeAfterManifests; + + if (lastResult != null) { + List oldMergeBeforeManifests = lastResult.getLeft(); + List oldMergeAfterManifests = lastResult.getRight(); + + Set retryMergeBefore = + oldMergeBeforeManifests.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toSet()); + + List manifestsFromOther = + mergeBeforeManifests.stream() + .filter(m -> !retryMergeBefore.remove(m.fileName())) + .collect(Collectors.toList()); + + if (retryMergeBefore.isEmpty()) { + // no manifest compact from latest failed commit to latest commit + mergeAfterManifests = new ArrayList<>(oldMergeAfterManifests); + mergeAfterManifests.addAll(manifestsFromOther); + } else { + // manifest compact happens, quit + return; + } + } else { + // the fist trial + mergeAfterManifests = + ManifestFileMerger.merge( + mergeBeforeManifests, + manifestFile, + manifestTargetSize.getBytes(), + 1, + manifestFullCompactionSize.getBytes(), + partitionType, + manifestReadParallelism); + } String baseManifestList = manifestList.write(mergeAfterManifests); String deltaManifestList = manifestList.write(Collections.emptyList()); @@ -1112,7 +1143,9 @@ public void compactManifest() { ? snapshotManager.snapshotPath(newSnapshot.id()) : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id()); - commitSnapshotImpl(newSnapshot, newSnapshotPath); + if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) { + compactManifest(Pair.of(mergeBeforeManifests, mergeAfterManifests)); + } } private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index 3ee179b7dcc4..e1807191defe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ManifestCompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -30,13 +30,13 @@ import java.util.Collections; /** Compact manifest file to reduce deleted manifest entries. */ -public class ManifestCompactProcedure extends ProcedureBase { +public class CompactManifestProcedure extends ProcedureBase { - private static final String commitUser = "Compact-Procedure-Committer"; + private static final String commitUser = "Compact-Manifest-Procedure-Committer"; @Override public String identifier() { - return "manifest_compact"; + return "compact_manifest"; } @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 039d996338c8..2cf57201d6ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -74,4 +74,4 @@ org.apache.paimon.flink.procedure.RenameTagProcedure org.apache.paimon.flink.procedure.FastForwardProcedure org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure -org.apache.paimon.flink.procedure.ManifestCompactProcedure +org.apache.paimon.flink.procedure.CompactManifestProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java similarity index 94% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index 02d4fe3c2c06..89cdc48d85a0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ManifestCompactProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -25,8 +25,8 @@ import java.util.Objects; -/** IT Case for {@link ManifestCompactProcedure}. */ -public class ManifestCompactProcedureITCase extends CatalogITCaseBase { +/** IT Case for {@link CompactManifestProcedure}. */ +public class CompactManifestProcedureITCase extends CatalogITCaseBase { @Test public void testManifestCompactProcedure() { @@ -61,7 +61,7 @@ public void testManifestCompactProcedure() { Assertions.assertThat( Objects.requireNonNull( - sql("CALL sys.manifest_compact(`table` => 'default.T')") + sql("CALL sys.compact_manifest(`table` => 'default.T')") .get(0) .getField(0)) .toString()) @@ -114,7 +114,7 @@ public void testManifestCompactProcedureWithBranch() { Assertions.assertThat( Objects.requireNonNull( - sql("CALL sys.manifest_compact(`table` => 'default.T$branch_branch1')") + sql("CALL sys.compact_manifest(`table` => 'default.T$branch_branch1')") .get(0) .getField(0)) .toString()) From 1d996fd765267af0eb4ef80073720987c58fdee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 16 Oct 2024 16:31:55 +0800 Subject: [PATCH 03/10] fix comment --- .../procedure/CompactManifestProcedure.java | 4 +- .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/CompactManifestProcedure.java | 95 +++++++++++++++++++ .../CompactManifestProcedureTest.scala | 50 ++++++++++ 4 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index e1807191defe..3e52322a6f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -32,7 +32,7 @@ /** Compact manifest file to reduce deleted manifest entries. */ public class CompactManifestProcedure extends ProcedureBase { - private static final String commitUser = "Compact-Manifest-Procedure-Committer"; + private static final String COMMIT_USER = "Compact-Manifest-Procedure-Committer"; @Override public String identifier() { @@ -47,7 +47,7 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E table(tableId) .copy( Collections.singletonMap( - CoreOptions.COMMIT_USER_PREFIX.key(), commitUser)); + CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER)); try (FileStoreCommit commit = table.store() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index d2a7180413b7..dee0c38d46fb 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.spark.procedure.CompactManifestProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; import org.apache.paimon.spark.procedure.CreateBranchProcedure; import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure; @@ -81,6 +82,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("fast_forward", FastForwardProcedure::builder); procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder); procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder); + procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java new file mode 100644 index 000000000000..dd064d892c3d --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Compact manifest procedure. Usage: + * + *

+ *  CALL sys.compact_manifest(table => 'tableId')
+ * 
+ */ +public class CompactManifestProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected CompactManifestProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + FileStoreTable table = (FileStoreTable) loadSparkTable(tableIdent).getTable(); + + try (FileStoreCommit commit = + table.store() + .newCommit(table.coreOptions().createCommitUser()) + .ignoreEmptyCommit(false)) { + commit.compactManifest(); + } + + return new InternalRow[] {newInternalRow(true)}; + } + + @Override + public String description() { + return "This procedure execute compact action on paimon table."; + } + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + public CompactManifestProcedure doBuild() { + return new CompactManifestProcedure(tableCatalog()); + } + }; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala new file mode 100644 index 000000000000..c1c90251338f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.streaming.StreamTest +import org.assertj.core.api.Assertions + +/** Test compact manifest procedure. See [[CompactManifestProcedure]]. */ +class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { + + test("Paimon Procedure: compact manifest") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2', 'compaction.max.file-num'='2') + |PARTITIONED BY (dt, hh) + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + + Thread.sleep(10000); + + var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(6L) + spark.sql("CALL sys.compact_manifest(table => 'T')") + rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L) + } +} From 2f81ff025f2c1a0e75ba672570534aae75f9563c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 16 Oct 2024 16:33:32 +0800 Subject: [PATCH 04/10] fix comment --- .../main/java/org/apache/paimon/operation/FileStoreCommit.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 5f11bd045c9e..43456cbe7184 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -74,6 +74,7 @@ void overwrite( void truncateTable(long commitIdentifier); + /** Compact the manifest entries only. */ void compactManifest(); /** Abort an unsuccessful commit. The data files will be deleted. */ From e9d89b632f3b845d8bd9d1da211979be17831571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 14:55:07 +0800 Subject: [PATCH 05/10] fix comment --- .../paimon/operation/FileStoreCommitImpl.java | 81 ++++++++++++++++--- 1 file changed, 72 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 215d205a3ebe..c141d45ab12c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1065,15 +1065,30 @@ CommitResult tryCommitOnce( } public void compactManifest() { - compactManifest(null); + int cnt = 0; + ManifestCompactResult retryResult = null; + while (true) { + cnt++; + retryResult = compactManifest(retryResult); + if (retryResult.isSuccess()) { + break; + } + + if (cnt >= commitMaxRetries) { + retryResult.cleanAll(); + throw new RuntimeException( + String.format( + "Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + } } - private void compactManifest( - @Nullable Pair, List> lastResult) { + private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult lastResult) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); if (latestSnapshot == null) { - return; + return new SuccessManifestCompactResult(); } List mergeBeforeManifests = @@ -1081,8 +1096,8 @@ private void compactManifest( List mergeAfterManifests; if (lastResult != null) { - List oldMergeBeforeManifests = lastResult.getLeft(); - List oldMergeAfterManifests = lastResult.getRight(); + List oldMergeBeforeManifests = lastResult.mergeBeforeManifests; + List oldMergeAfterManifests = lastResult.mergeAfterManifests; Set retryMergeBefore = oldMergeBeforeManifests.stream() @@ -1100,7 +1115,8 @@ private void compactManifest( mergeAfterManifests.addAll(manifestsFromOther); } else { // manifest compact happens, quit - return; + lastResult.cleanAll(); + return new SuccessManifestCompactResult(); } } else { // the fist trial @@ -1110,7 +1126,7 @@ private void compactManifest( manifestFile, manifestTargetSize.getBytes(), 1, - manifestFullCompactionSize.getBytes(), + 1, partitionType, manifestReadParallelism); } @@ -1144,7 +1160,10 @@ private void compactManifest( : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id()); if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) { - compactManifest(Pair.of(mergeBeforeManifests, mergeAfterManifests)); + return new ManifestCompactResult( + baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests); + } else { + return new SuccessManifestCompactResult(); } } @@ -1573,4 +1592,48 @@ public boolean isSuccess() { return false; } } + + private class ManifestCompactResult implements CommitResult { + + private final String baseManifestList; + private final String deltaManifestList; + private final List mergeBeforeManifests; + private final List mergeAfterManifests; + + public ManifestCompactResult( + String baseManifestList, + String deltaManifestList, + List mergeBeforeManifests, + List mergeAfterManifests) { + this.baseManifestList = baseManifestList; + this.deltaManifestList = deltaManifestList; + this.mergeBeforeManifests = mergeBeforeManifests; + this.mergeAfterManifests = mergeAfterManifests; + } + + public void cleanAll() { + manifestList.delete(deltaManifestList); + cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); + } + + @Override + public boolean isSuccess() { + return false; + } + } + + private class SuccessManifestCompactResult extends ManifestCompactResult { + + public SuccessManifestCompactResult() { + super(null, null, null, null); + } + + @Override + public void cleanAll() {} + + @Override + public boolean isSuccess() { + return true; + } + } } From 1b13af35d65b5c0f718402b253d66959f0d9c4ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 15:03:23 +0800 Subject: [PATCH 06/10] fix comment --- .../paimon/operation/FileStoreCommitTest.java | 55 ++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 63b1f1dafc34..c78d9d7d636c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -943,17 +943,68 @@ public void testManifestCompact() throws Exception { .isEqualTo(0); } + @Test + public void testManifestCompactFull() throws Exception { + // Disable full compaction by options. + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), + String.valueOf(Long.MAX_VALUE))); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + // commit 1 + Snapshot snapshot = + store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0); + + for (int i = 0; i < 10; i++) { + snapshot = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + } + + long deleteNum = + store.manifestListFactory().create().readDataManifests(snapshot).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum(); + assertThat(deleteNum).isGreaterThan(0); + store.newCommit().compactManifest(); + Snapshot latest = store.snapshotManager().latestSnapshot(); + assertThat( + store.manifestListFactory().create().readDataManifests(latest).stream() + .mapToLong(ManifestFileMeta::numDeletedFiles) + .sum()) + .isEqualTo(0); + } + + private TestFileStore createStore(boolean failing, Map options) + throws Exception { + return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } private TestFileStore createStore(boolean failing, int numBucket) throws Exception { - return createStore(failing, numBucket, CoreOptions.ChangelogProducer.NONE); + return createStore( + failing, numBucket, CoreOptions.ChangelogProducer.NONE, Collections.emptyMap()); } private TestFileStore createStore( boolean failing, int numBucket, CoreOptions.ChangelogProducer changelogProducer) throws Exception { + return createStore(failing, numBucket, changelogProducer, Collections.emptyMap()); + } + + private TestFileStore createStore( + boolean failing, + int numBucket, + CoreOptions.ChangelogProducer changelogProducer, + Map options) + throws Exception { String root = failing ? FailingFileIO.getFailingPath(failingName, tempDir.toString()) @@ -967,7 +1018,7 @@ private TestFileStore createStore( TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.getPrimaryKeys( TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), - Collections.emptyMap(), + options, null)); return new TestFileStore.Builder( "avro", From fc984e1cf1d863cebe778edf16f141580acaee08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 15:12:04 +0800 Subject: [PATCH 07/10] fix comment --- .../java/org/apache/paimon/operation/FileStoreCommitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index c78d9d7d636c..c2d3c9e2a949 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -959,7 +959,7 @@ public void testManifestCompactFull() throws Exception { Snapshot snapshot = store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { snapshot = store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) .get(0); From e7771c9262deb0d5ed07f29f272ab04c65c9644d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 15:31:27 +0800 Subject: [PATCH 08/10] fix comment --- .../java/org/apache/paimon/operation/FileStoreCommitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index c2d3c9e2a949..67945df60ce8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -933,7 +933,7 @@ public void testManifestCompact() throws Exception { store.manifestListFactory().create().readDataManifests(snapshot3).stream() .mapToLong(ManifestFileMeta::numDeletedFiles) .sum(); - assertThat(deleteNum).isEqualTo(2); + assertThat(deleteNum).isGreaterThan(0); store.newCommit().compactManifest(); Snapshot latest = store.snapshotManager().latestSnapshot(); assertThat( From 0a86ed66f97e2cd0c91619cb5f050a234f41a03f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 17:06:47 +0800 Subject: [PATCH 09/10] fix comment --- .../org/apache/paimon/operation/FileStoreCommitImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index c141d45ab12c..86f57c5d7c53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1129,6 +1129,12 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la 1, partitionType, manifestReadParallelism); + + if (mergeBeforeManifests.size() == mergeAfterManifests.size() + && new HashSet<>(mergeBeforeManifests).containsAll(mergeAfterManifests)) { + // no need to commit this snapshot, because no compact were happened + return new SuccessManifestCompactResult(); + } } String baseManifestList = manifestList.write(mergeAfterManifests); From 1a98423537b2cc37bf8d7cf4167e45cd819a9b8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 17 Oct 2024 18:00:53 +0800 Subject: [PATCH 10/10] fix comment --- .../java/org/apache/paimon/operation/FileStoreCommitImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 86f57c5d7c53..9ce089992350 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1130,8 +1130,7 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la partitionType, manifestReadParallelism); - if (mergeBeforeManifests.size() == mergeAfterManifests.size() - && new HashSet<>(mergeBeforeManifests).containsAll(mergeAfterManifests)) { + if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) { // no need to commit this snapshot, because no compact were happened return new SuccessManifestCompactResult(); }