Skip to content

Commit

Permalink
[core] Make commit.max-retries behave as it means (apache#4641)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 4, 2024
1 parent 66ca698 commit aec2547
Showing 1 changed file with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -721,21 +721,10 @@ private int tryCommit(
ConflictCheck conflictCheck,
String branchName,
@Nullable String statsFileName) {
int cnt = 0;
int retryCount = 0;
RetryResult retryResult = null;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
if (cnt >= commitMaxRetries) {
if (retryResult != null) {
retryResult.cleanAll();
}
throw new RuntimeException(
String.format(
"Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}

CommitResult result =
tryCommitOnce(
retryResult,
Expand All @@ -756,8 +745,19 @@ private int tryCommit(
}

retryResult = (RetryResult) result;

if (retryCount >= commitMaxRetries) {
if (retryResult != null) {
retryResult.cleanAll();
}
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
retryCount++;
}
return cnt;
return retryCount + 1;
}

private int tryOverwrite(
Expand All @@ -767,17 +767,10 @@ private int tryOverwrite(
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
int cnt = 0;
int retryCount = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();

cnt++;
if (cnt >= commitMaxRetries) {
throw new RuntimeException(
String.format(
"Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
Expand Down Expand Up @@ -833,8 +826,16 @@ private int tryOverwrite(
// TODO optimize OVERWRITE too
RetryResult retryResult = (RetryResult) result;
retryResult.cleanAll();

if (retryCount >= commitMaxRetries) {
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
retryCount++;
}
return cnt;
return retryCount + 1;
}

@VisibleForTesting
Expand Down Expand Up @@ -1074,22 +1075,22 @@ CommitResult tryCommitOnce(
}

public void compactManifest() {
int cnt = 0;
int retryCount = 0;
ManifestCompactResult retryResult = null;
while (true) {
cnt++;
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}

if (cnt >= commitMaxRetries) {
if (retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.",
"Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
retryCount++;
}
}

Expand Down

0 comments on commit aec2547

Please sign in to comment.