Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add CompactManifestProcedure to enable compact manifest manually #4327

Merged
merged 10 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ void overwrite(

void truncateTable(long commitIdentifier);

/** Compact the manifest entries only. */
void compactManifest();
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,47 +1025,7 @@ CommitResult tryCommitOnce(
e);
}

boolean success;
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
if (lock != null) {
success =
lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
success = callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshotId,
newSnapshotPath,
commitUser,
identifier,
commitKind.name()),
e);
}

if (success) {
if (commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
Expand Down Expand Up @@ -1104,6 +1064,149 @@ CommitResult tryCommitOnce(
baseDataFiles);
}

public void compactManifest() {
int cnt = 0;
ManifestCompactResult retryResult = null;
while (true) {
cnt++;
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}

if (cnt >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
}
}

private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult lastResult) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

if (latestSnapshot == null) {
return new SuccessManifestCompactResult();
}

List<ManifestFileMeta> mergeBeforeManifests =
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;

if (lastResult != null) {
List<ManifestFileMeta> oldMergeBeforeManifests = lastResult.mergeBeforeManifests;
List<ManifestFileMeta> oldMergeAfterManifests = lastResult.mergeAfterManifests;

Set<String> retryMergeBefore =
oldMergeBeforeManifests.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toSet());

List<ManifestFileMeta> manifestsFromOther =
mergeBeforeManifests.stream()
.filter(m -> !retryMergeBefore.remove(m.fileName()))
.collect(Collectors.toList());

if (retryMergeBefore.isEmpty()) {
// no manifest compact from latest failed commit to latest commit
mergeAfterManifests = new ArrayList<>(oldMergeAfterManifests);
mergeAfterManifests.addAll(manifestsFromOther);
} else {
// manifest compact happens, quit
lastResult.cleanAll();
return new SuccessManifestCompactResult();
}
} else {
// the fist trial
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
1,
1,
partitionType,
manifestReadParallelism);
}

String baseManifestList = manifestList.write(mergeAfterManifests);
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
String deltaManifestList = manifestList.write(Collections.emptyList());

// prepare snapshot file
Snapshot newSnapshot =
new Snapshot(
latestSnapshot.id() + 1,
latestSnapshot.schemaId(),
baseManifestList,
deltaManifestList,
null,
latestSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
Snapshot.CommitKind.COMPACT,
System.currentTimeMillis(),
latestSnapshot.logOffsets(),
latestSnapshot.totalRecordCount(),
0L,
0L,
latestSnapshot.watermark(),
latestSnapshot.statistics());

Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshot.id())
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id());

if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
return new ManifestCompactResult(
baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests);
} else {
return new SuccessManifestCompactResult();
}
}

private boolean commitSnapshotImpl(Snapshot newSnapshot, Path newSnapshotPath) {
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshot.id());
}
return committed;
};
if (lock != null) {
return lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
return callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshot.id(),
newSnapshotPath,
commitUser,
newSnapshot.commitIdentifier(),
newSnapshot.commitKind().name()),
e);
}
}

private List<SimpleFileEntry> readIncrementalChanges(
Snapshot from, Snapshot to, List<BinaryRow> changedPartitions) {
List<SimpleFileEntry> entries = new ArrayList<>();
Expand Down Expand Up @@ -1489,4 +1592,48 @@ public boolean isSuccess() {
return false;
}
}

private class ManifestCompactResult implements CommitResult {

private final String baseManifestList;
private final String deltaManifestList;
private final List<ManifestFileMeta> mergeBeforeManifests;
private final List<ManifestFileMeta> mergeAfterManifests;

public ManifestCompactResult(
String baseManifestList,
String deltaManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
this.mergeBeforeManifests = mergeBeforeManifests;
this.mergeAfterManifests = mergeAfterManifests;
}

public void cleanAll() {
manifestList.delete(deltaManifestList);
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
}

@Override
public boolean isSuccess() {
return false;
}
}

private class SuccessManifestCompactResult extends ManifestCompactResult {

public SuccessManifestCompactResult() {
super(null, null, null, null);
}

@Override
public void cleanAll() {}

@Override
public boolean isSuccess() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -910,17 +911,100 @@ public void testDVIndexFiles() throws Exception {
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}

@Test
public void testManifestCompact() throws Exception {
TestFileStore store = createStore(false);

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
// commit 1
Snapshot snapshot1 =
store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0);
// commit 2
Snapshot snapshot2 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);
// commit 3
Snapshot snapshot3 =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);

long deleteNum =
store.manifestListFactory().create().readDataManifests(snapshot3).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum();
assertThat(deleteNum).isGreaterThan(0);
store.newCommit().compactManifest();
Snapshot latest = store.snapshotManager().latestSnapshot();
assertThat(
store.manifestListFactory().create().readDataManifests(latest).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum())
.isEqualTo(0);
}

@Test
public void testManifestCompactFull() throws Exception {
// Disable full compaction by options.
TestFileStore store =
createStore(
false,
Collections.singletonMap(
CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(),
String.valueOf(Long.MAX_VALUE)));

List<KeyValue> keyValues = generateDataList(1);
BinaryRow partition = gen.getPartition(keyValues.get(0));
// commit 1
Snapshot snapshot =
store.commitData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()).get(0);

for (int i = 0; i < 100; i++) {
snapshot =
store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap())
.get(0);
}

long deleteNum =
store.manifestListFactory().create().readDataManifests(snapshot).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum();
assertThat(deleteNum).isGreaterThan(0);
store.newCommit().compactManifest();
Snapshot latest = store.snapshotManager().latestSnapshot();
assertThat(
store.manifestListFactory().create().readDataManifests(latest).stream()
.mapToLong(ManifestFileMeta::numDeletedFiles)
.sum())
.isEqualTo(0);
}

private TestFileStore createStore(boolean failing, Map<String, String> options)
throws Exception {
return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options);
}

private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}

private TestFileStore createStore(boolean failing, int numBucket) throws Exception {
return createStore(failing, numBucket, CoreOptions.ChangelogProducer.NONE);
return createStore(
failing, numBucket, CoreOptions.ChangelogProducer.NONE, Collections.emptyMap());
}

private TestFileStore createStore(
boolean failing, int numBucket, CoreOptions.ChangelogProducer changelogProducer)
throws Exception {
return createStore(failing, numBucket, changelogProducer, Collections.emptyMap());
}

private TestFileStore createStore(
boolean failing,
int numBucket,
CoreOptions.ChangelogProducer changelogProducer,
Map<String, String> options)
throws Exception {
String root =
failing
? FailingFileIO.getFailingPath(failingName, tempDir.toString())
Expand All @@ -934,7 +1018,7 @@ private TestFileStore createStore(
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
TestKeyValueGenerator.getPrimaryKeys(
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
Collections.emptyMap(),
options,
null));
return new TestFileStore.Builder(
"avro",
Expand Down
Loading
Loading