From aec25472882ff6e27da793e82fd56f47d6cf5eef Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Wed, 4 Dec 2024 21:15:56 +0800 Subject: [PATCH] [core] Make commit.max-retries behave as it means (#4641) --- .../paimon/operation/FileStoreCommitImpl.java | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 153f9f07e9ff..4808975fa763 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -721,21 +721,10 @@ private int tryCommit( ConflictCheck conflictCheck, String branchName, @Nullable String statsFileName) { - int cnt = 0; + int retryCount = 0; RetryResult retryResult = null; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); - cnt++; - if (cnt >= commitMaxRetries) { - if (retryResult != null) { - retryResult.cleanAll(); - } - throw new RuntimeException( - String.format( - "Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } - CommitResult result = tryCommitOnce( retryResult, @@ -756,8 +745,19 @@ private int tryCommit( } retryResult = (RetryResult) result; + + if (retryCount >= commitMaxRetries) { + if (retryResult != null) { + retryResult.cleanAll(); + } + throw new RuntimeException( + String.format( + "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + retryCount++; } - return cnt; + return retryCount + 1; } private int tryOverwrite( @@ -767,17 +767,10 @@ private int tryOverwrite( long identifier, @Nullable Long watermark, Map logOffsets) { - int cnt = 0; + int retryCount = 0; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); - cnt++; - if (cnt >= commitMaxRetries) { - throw new RuntimeException( - String.format( - "Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } List changesWithOverwrite = new ArrayList<>(); List indexChangesWithOverwrite = new ArrayList<>(); if (latestSnapshot != null) { @@ -833,8 +826,16 @@ private int tryOverwrite( // TODO optimize OVERWRITE too RetryResult retryResult = (RetryResult) result; retryResult.cleanAll(); + + if (retryCount >= commitMaxRetries) { + throw new RuntimeException( + String.format( + "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + retryCount++; } - return cnt; + return retryCount + 1; } @VisibleForTesting @@ -1074,22 +1075,22 @@ CommitResult tryCommitOnce( } public void compactManifest() { - int cnt = 0; + int retryCount = 0; ManifestCompactResult retryResult = null; while (true) { - cnt++; retryResult = compactManifest(retryResult); if (retryResult.isSuccess()) { break; } - if (cnt >= commitMaxRetries) { + if (retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", + "Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.", commitMaxRetries)); } + retryCount++; } }