Skip to content

Commit

Permalink
[core] Optimize overwrite commit to use CommitResult to retry
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 7, 2024
1 parent 8484bb4 commit b57fa01
Showing 1 changed file with 42 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,7 @@ private int tryCommit(
retryResult = (RetryResult) result;

if (retryCount >= commitMaxRetries) {
if (retryResult != null) {
retryResult.cleanAll();
}
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
Expand All @@ -767,75 +765,52 @@ private int tryOverwrite(
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
int retryCount = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
List<ManifestEntry> currentEntries =
scan.withSnapshot(latestSnapshot)
.withPartitionFilter(partitionFilter)
.withKind(ScanMode.ALL)
.plan()
.files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
FileKind.DELETE,
entry.partition(),
entry.bucket(),
entry.totalBuckets(),
entry.file()));
}
// collect all files with overwrite
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
List<ManifestEntry> currentEntries =
scan.withSnapshot(latestSnapshot)
.withPartitionFilter(partitionFilter)
.withKind(ScanMode.ALL)
.plan()
.files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
FileKind.DELETE,
entry.partition(),
entry.bucket(),
entry.totalBuckets(),
entry.file()));
}

// collect index files
if (latestSnapshot.indexManifest() != null) {
List<IndexManifestEntry> entries =
indexManifestFile.read(latestSnapshot.indexManifest());
for (IndexManifestEntry entry : entries) {
if (partitionFilter == null || partitionFilter.test(entry.partition())) {
indexChangesWithOverwrite.add(entry.toDeleteEntry());
}
// collect index files
if (latestSnapshot.indexManifest() != null) {
List<IndexManifestEntry> entries =
indexManifestFile.read(latestSnapshot.indexManifest());
for (IndexManifestEntry entry : entries) {
if (partitionFilter == null || partitionFilter.test(entry.partition())) {
indexChangesWithOverwrite.add(entry.toDeleteEntry());
}
}
}
changesWithOverwrite.addAll(changes);
indexChangesWithOverwrite.addAll(indexFiles);

CommitResult result =
tryCommitOnce(
null,
changesWithOverwrite,
Collections.emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
mustConflictCheck(),
branchName,
null);

if (result.isSuccess()) {
break;
}

// TODO optimize OVERWRITE too
RetryResult retryResult = (RetryResult) result;
retryResult.cleanAll();

if (retryCount >= commitMaxRetries) {
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
retryCount++;
}
return retryCount + 1;
changesWithOverwrite.addAll(changes);
indexChangesWithOverwrite.addAll(indexFiles);

return tryCommit(
changesWithOverwrite,
Collections.emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
mustConflictCheck(),
branchName,
null);
}

@VisibleForTesting
Expand Down

0 comments on commit b57fa01

Please sign in to comment.