diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 215d205a3ebe..c141d45ab12c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1065,15 +1065,30 @@ CommitResult tryCommitOnce( } public void compactManifest() { - compactManifest(null); + int cnt = 0; + ManifestCompactResult retryResult = null; + while (true) { + cnt++; + retryResult = compactManifest(retryResult); + if (retryResult.isSuccess()) { + break; + } + + if (cnt >= commitMaxRetries) { + retryResult.cleanAll(); + throw new RuntimeException( + String.format( + "Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + } } - private void compactManifest( - @Nullable Pair, List> lastResult) { + private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult lastResult) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); if (latestSnapshot == null) { - return; + return new SuccessManifestCompactResult(); } List mergeBeforeManifests = @@ -1081,8 +1096,8 @@ private void compactManifest( List mergeAfterManifests; if (lastResult != null) { - List oldMergeBeforeManifests = lastResult.getLeft(); - List oldMergeAfterManifests = lastResult.getRight(); + List oldMergeBeforeManifests = lastResult.mergeBeforeManifests; + List oldMergeAfterManifests = lastResult.mergeAfterManifests; Set retryMergeBefore = oldMergeBeforeManifests.stream() @@ -1100,7 +1115,8 @@ private void compactManifest( mergeAfterManifests.addAll(manifestsFromOther); } else { // manifest compact happens, quit - return; + lastResult.cleanAll(); + return new SuccessManifestCompactResult(); } } else { // the fist trial @@ -1110,7 +1126,7 @@ private void compactManifest( manifestFile, manifestTargetSize.getBytes(), 1, - manifestFullCompactionSize.getBytes(), + 1, partitionType, manifestReadParallelism); } @@ -1144,7 +1160,10 @@ private void compactManifest( : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id()); if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) { - compactManifest(Pair.of(mergeBeforeManifests, mergeAfterManifests)); + return new ManifestCompactResult( + baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests); + } else { + return new SuccessManifestCompactResult(); } } @@ -1573,4 +1592,48 @@ public boolean isSuccess() { return false; } } + + private class ManifestCompactResult implements CommitResult { + + private final String baseManifestList; + private final String deltaManifestList; + private final List mergeBeforeManifests; + private final List mergeAfterManifests; + + public ManifestCompactResult( + String baseManifestList, + String deltaManifestList, + List mergeBeforeManifests, + List mergeAfterManifests) { + this.baseManifestList = baseManifestList; + this.deltaManifestList = deltaManifestList; + this.mergeBeforeManifests = mergeBeforeManifests; + this.mergeAfterManifests = mergeAfterManifests; + } + + public void cleanAll() { + manifestList.delete(deltaManifestList); + cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); + } + + @Override + public boolean isSuccess() { + return false; + } + } + + private class SuccessManifestCompactResult extends ManifestCompactResult { + + public SuccessManifestCompactResult() { + super(null, null, null, null); + } + + @Override + public void cleanAll() {} + + @Override + public boolean isSuccess() { + return true; + } + } }