Skip to content

Commit

Permalink
[flink] Add ManifestCompactProcedure to enable compact manifest manually
Browse files Browse the repository at this point in the history
  • Loading branch information
仟弋 committed Oct 15, 2024
1 parent c41d23c commit 14b14a4
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ void overwrite(

void truncateTable(long commitIdentifier);

void compactManifest();

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,47 +1025,7 @@ CommitResult tryCommitOnce(
e);
}

boolean success;
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
if (lock != null) {
success =
lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
success = callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshotId,
newSnapshotPath,
commitUser,
identifier,
commitKind.name()),
e);
}

if (success) {
if (commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
Expand Down Expand Up @@ -1104,6 +1064,97 @@ CommitResult tryCommitOnce(
baseDataFiles);
}

public void compactManifest() {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

if (latestSnapshot == null) {
return;
}

List<ManifestFileMeta> mergeBeforeManifests =
manifestList.readDataManifests(latestSnapshot);

List<ManifestFileMeta> mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
1,
manifestFullCompactionSize.getBytes(),
partitionType,
manifestReadParallelism);

String baseManifestList = manifestList.write(mergeAfterManifests);
String deltaManifestList = manifestList.write(Collections.emptyList());

// prepare snapshot file
Snapshot newSnapshot =
new Snapshot(
latestSnapshot.id() + 1,
latestSnapshot.schemaId(),
baseManifestList,
deltaManifestList,
null,
latestSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
Snapshot.CommitKind.COMPACT,
System.currentTimeMillis(),
latestSnapshot.logOffsets(),
latestSnapshot.totalRecordCount(),
0L,
0L,
latestSnapshot.watermark(),
latestSnapshot.statistics());

Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshot.id())
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id());

commitSnapshotImpl(newSnapshot, newSnapshotPath);
}

private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) {
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshot.id());
}
return committed;
};
if (lock != null) {
return lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
return callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshot.id(),
newSnapshotPath,
commitUser,
newSnapshot.commitIdentifier(),
newSnapshot.commitKind().name()),
e);
}
}

private List<SimpleFileEntry> readIncrementalChanges(
Snapshot from, Snapshot to, List<BinaryRow> changedPartitions) {
List<SimpleFileEntry> entries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -910,6 +911,38 @@ public void testDVIndexFiles() throws Exception {
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}

@Test
public void testManifestCompact() throws Exception {
TestFileStore store = createStore(false);

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
// commit 1
Snapshot snapshot1 =
store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0);
// commit 2
Snapshot snapshot2 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);
// commit 3
Snapshot snapshot3 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);

long deleteNum =
store.manifestListFactory().create().readDataManifests(snapshot3).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum();
assertThat(deleteNum).isEqualTo(2);
store.newCommit().compactManifest();
Snapshot latest = store.snapshotManager().latestSnapshot();
assertThat(
store.manifestListFactory().create().readDataManifests(latest).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum())
.isEqualTo(0);
}

private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Collections;

/** Compact manifest file to reduce deleted manifest entries. */
public class ManifestCompactProcedure extends ProcedureBase {

private static final String commitUser = "Compact-Procedure-Committer";

@Override
public String identifier() {
return "manifest_compact";
}

@ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
public String[] call(ProcedureContext procedureContext, String tableId) throws Exception {

FileStoreTable table =
(FileStoreTable)
table(tableId)
.copy(
Collections.singletonMap(
CoreOptions.COMMIT_USER_PREFIX.key(), commitUser));

try (FileStoreCommit commit =
table.store()
.newCommit(table.coreOptions().createCommitUser())
.ignoreEmptyCommit(false)) {
commit.compactManifest();
}
return new String[] {"success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ org.apache.paimon.flink.procedure.RenameTagProcedure
org.apache.paimon.flink.procedure.FastForwardProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CloneProcedure
org.apache.paimon.flink.procedure.ManifestCompactProcedure
Loading

0 comments on commit 14b14a4

Please sign in to comment.