Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
仟弋 committed Oct 17, 2024
1 parent 2f81ff0 commit e9d89b6
Showing 1 changed file with 72 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1065,24 +1065,39 @@ CommitResult tryCommitOnce(
}

public void compactManifest() {
compactManifest(null);
int cnt = 0;
ManifestCompactResult retryResult = null;
while (true) {
cnt++;
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}

if (cnt >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
}
}

private void compactManifest(
@Nullable Pair<List<ManifestFileMeta>, List<ManifestFileMeta>> lastResult) {
private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult lastResult) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

if (latestSnapshot == null) {
return;
return new SuccessManifestCompactResult();
}

List<ManifestFileMeta> mergeBeforeManifests =
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;

if (lastResult != null) {
List<ManifestFileMeta> oldMergeBeforeManifests = lastResult.getLeft();
List<ManifestFileMeta> oldMergeAfterManifests = lastResult.getRight();
List<ManifestFileMeta> oldMergeBeforeManifests = lastResult.mergeBeforeManifests;
List<ManifestFileMeta> oldMergeAfterManifests = lastResult.mergeAfterManifests;

Set<String> retryMergeBefore =
oldMergeBeforeManifests.stream()
Expand All @@ -1100,7 +1115,8 @@ private void compactManifest(
mergeAfterManifests.addAll(manifestsFromOther);
} else {
// manifest compact happens, quit
return;
lastResult.cleanAll();
return new SuccessManifestCompactResult();
}
} else {
// the fist trial
Expand All @@ -1110,7 +1126,7 @@ private void compactManifest(
manifestFile,
manifestTargetSize.getBytes(),
1,
manifestFullCompactionSize.getBytes(),
1,
partitionType,
manifestReadParallelism);
}
Expand Down Expand Up @@ -1144,7 +1160,10 @@ private void compactManifest(
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshot.id());

if (!commitSnapshotImpl(newSnapshot, newSnapshotPath)) {
compactManifest(Pair.of(mergeBeforeManifests, mergeAfterManifests));
return new ManifestCompactResult(
baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests);
} else {
return new SuccessManifestCompactResult();
}
}

Expand Down Expand Up @@ -1573,4 +1592,48 @@ public boolean isSuccess() {
return false;
}
}

private class ManifestCompactResult implements CommitResult {

private final String baseManifestList;
private final String deltaManifestList;
private final List<ManifestFileMeta> mergeBeforeManifests;
private final List<ManifestFileMeta> mergeAfterManifests;

public ManifestCompactResult(
String baseManifestList,
String deltaManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
this.mergeBeforeManifests = mergeBeforeManifests;
this.mergeAfterManifests = mergeAfterManifests;
}

public void cleanAll() {
manifestList.delete(deltaManifestList);
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
}

@Override
public boolean isSuccess() {
return false;
}
}

private class SuccessManifestCompactResult extends ManifestCompactResult {

public SuccessManifestCompactResult() {
super(null, null, null, null);
}

@Override
public void cleanAll() {}

@Override
public boolean isSuccess() {
return true;
}
}
}

0 comments on commit e9d89b6

Please sign in to comment.