Skip to content

Commit

Permalink
[flink] Add CompactManifestProcedure to enable compact manifest manua…
Browse files Browse the repository at this point in the history
…lly (apache#4327)
  • Loading branch information
leaves12138 authored Oct 20, 2024
1 parent aeffc89 commit 8489a2d
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ void overwrite(

void truncateTable(long commitIdentifier);

/** Compact the manifest entries only. */
void compactManifest();

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,47 +1025,7 @@ CommitResult tryCommitOnce(
e);
}

boolean success;
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
if (lock != null) {
success =
lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
success = callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshotId,
newSnapshotPath,
commitUser,
identifier,
commitKind.name()),
e);
}

if (success) {
if (commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
Expand Down Expand Up @@ -1104,6 +1064,154 @@ CommitResult tryCommitOnce(
baseDataFiles);
}

public void compactManifest() {
int cnt = 0;
ManifestCompactResult retryResult = null;
while (true) {
cnt++;
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}

if (cnt >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
}
}

private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult lastResult) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

if (latestSnapshot == null) {
return new SuccessManifestCompactResult();
}

List<ManifestFileMeta> mergeBeforeManifests =
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;

if (lastResult != null) {
List<ManifestFileMeta> oldMergeBeforeManifests = lastResult.mergeBeforeManifests;
List<ManifestFileMeta> oldMergeAfterManifests = lastResult.mergeAfterManifests;

Set<String> retryMergeBefore =
oldMergeBeforeManifests.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toSet());

List<ManifestFileMeta> manifestsFromOther =
mergeBeforeManifests.stream()
.filter(m -> !retryMergeBefore.remove(m.fileName()))
.collect(Collectors.toList());

if (retryMergeBefore.isEmpty()) {
// no manifest compact from latest failed commit to latest commit
mergeAfterManifests = new ArrayList<>(oldMergeAfterManifests);
mergeAfterManifests.addAll(manifestsFromOther);
} else {
// manifest compact happens, quit
lastResult.cleanAll();
return new SuccessManifestCompactResult();
}
} else {
// the fist trial
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
1,
1,
partitionType,
manifestReadParallelism);

if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
return new SuccessManifestCompactResult();
}
}

String baseManifestList = manifestList.write(mergeAfterManifests);
String deltaManifestList = manifestList.write(Collections.emptyList());

// prepare snapshot file
Snapshot newSnapshot =
new Snapshot(
latestSnapshot.id() + 1,
latestSnapshot.schemaId(),
baseManifestList,
deltaManifestList,
null,
latestSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
Snapshot.CommitKind.COMPACT,
System.currentTimeMillis(),
latestSnapshot.logOffsets(),
latestSnapshot.totalRecordCount(),
0L,
0L,
latestSnapshot.watermark(),
latestSnapshot.statistics());

Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshot.id())
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id());

if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
return new ManifestCompactResult(
baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests);
} else {
return new SuccessManifestCompactResult();
}
}

private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) {
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshot.id());
}
return committed;
};
if (lock != null) {
return lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
return callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshot.id(),
newSnapshotPath,
commitUser,
newSnapshot.commitIdentifier(),
newSnapshot.commitKind().name()),
e);
}
}

private List<SimpleFileEntry> readIncrementalChanges(
Snapshot from, Snapshot to, List<BinaryRow> changedPartitions) {
List<SimpleFileEntry> entries = new ArrayList<>();
Expand Down Expand Up @@ -1489,4 +1597,48 @@ public boolean isSuccess() {
return false;
}
}

private class ManifestCompactResult implements CommitResult {

private final String baseManifestList;
private final String deltaManifestList;
private final List<ManifestFileMeta> mergeBeforeManifests;
private final List<ManifestFileMeta> mergeAfterManifests;

public ManifestCompactResult(
String baseManifestList,
String deltaManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
this.mergeBeforeManifests = mergeBeforeManifests;
this.mergeAfterManifests = mergeAfterManifests;
}

public void cleanAll() {
manifestList.delete(deltaManifestList);
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
}

@Override
public boolean isSuccess() {
return false;
}
}

private class SuccessManifestCompactResult extends ManifestCompactResult {

public SuccessManifestCompactResult() {
super(null, null, null, null);
}

@Override
public void cleanAll() {}

@Override
public boolean isSuccess() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -910,17 +911,100 @@ public void testDVIndexFiles() throws Exception {
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}

@Test
public void testManifestCompact() throws Exception {
TestFileStore store = createStore(false);

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
// commit 1
Snapshot snapshot1 =
store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0);
// commit 2
Snapshot snapshot2 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);
// commit 3
Snapshot snapshot3 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);

long deleteNum =
store.manifestListFactory().create().readDataManifests(snapshot3).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum();
assertThat(deleteNum).isGreaterThan(0);
store.newCommit().compactManifest();
Snapshot latest = store.snapshotManager().latestSnapshot();
assertThat(
store.manifestListFactory().create().readDataManifests(latest).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum())
.isEqualTo(0);
}

@Test
public void testManifestCompactFull() throws Exception {
// Disable full compaction by options.
TestFileStore store =
createStore(
false,
Collections.singletonMap(
CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(),
String.valueOf(Long.MAX_VALUE)));

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
// commit 1
Snapshot snapshot =
store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0);

for (int i = 0; i < 100; i++) {
snapshot =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);
}

long deleteNum =
store.manifestListFactory().create().readDataManifests(snapshot).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum();
assertThat(deleteNum).isGreaterThan(0);
store.newCommit().compactManifest();
Snapshot latest = store.snapshotManager().latestSnapshot();
assertThat(
store.manifestListFactory().create().readDataManifests(latest).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum())
.isEqualTo(0);
}

private TestFileStore createStore(boolean failing, Map<String, String> options)
throws Exception {
return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options);
}

private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}

private TestFileStore createStore(boolean failing, int numBucket) throws Exception {
return createStore(failing, numBucket, CoreOptions.ChangelogProducer.NONE);
return createStore(
failing, numBucket, CoreOptions.ChangelogProducer.NONE, Collections.emptyMap());
}

private TestFileStore createStore(
boolean failing, int numBucket, CoreOptions.ChangelogProducer changelogProducer)
throws Exception {
return createStore(failing, numBucket, changelogProducer, Collections.emptyMap());
}

private TestFileStore createStore(
boolean failing,
int numBucket,
CoreOptions.ChangelogProducer changelogProducer,
Map<String, String> options)
throws Exception {
String root =
failing
? FailingFileIO.getFailingPath(failingName, tempDir.toString())
Expand All @@ -934,7 +1018,7 @@ private TestFileStore createStore(
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
TestKeyValueGenerator.getPrimaryKeys(
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
Collections.emptyMap(),
options,
null));
return new TestFileStore.Builder(
"avro",
Expand Down
Loading

0 comments on commit 8489a2d

Please sign in to comment.