From b7ffdaeff049eb8b88ad8b123b708425810c6fcf Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 4 Dec 2024 18:00:41 +0800 Subject: [PATCH] 1 --- .../paimon/operation/FileStoreCommitImpl.java | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index bbd9b27ee657..34059561e73d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -716,21 +716,10 @@ private int tryCommit( ConflictCheck conflictCheck, String branchName, @Nullable String statsFileName) { - int cnt = 0; + int retryCount = 0; RetryResult retryResult = null; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); - cnt++; - if (cnt >= commitMaxRetries) { - if (retryResult != null) { - retryResult.cleanAll(); - } - throw new RuntimeException( - String.format( - "Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } - CommitResult result = tryCommitOnce( retryResult, @@ -751,8 +740,19 @@ private int tryCommit( } retryResult = (RetryResult) result; + + if (retryCount >= commitMaxRetries) { + if (retryResult != null) { + retryResult.cleanAll(); + } + throw new RuntimeException( + String.format( + "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + retryCount++; } - return cnt; + return retryCount + 1; } private int tryOverwrite( @@ -762,17 +762,10 @@ private int tryOverwrite( long identifier, @Nullable Long watermark, Map logOffsets) { - int cnt = 0; + int retryCount = 0; while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); - cnt++; - if (cnt >= commitMaxRetries) { - throw new RuntimeException( - String.format( - "Commit failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } List changesWithOverwrite = new ArrayList<>(); List indexChangesWithOverwrite = new ArrayList<>(); if (latestSnapshot != null) { @@ -828,8 +821,16 @@ private int tryOverwrite( // TODO optimize OVERWRITE too RetryResult retryResult = (RetryResult) result; retryResult.cleanAll(); + + if (retryCount >= commitMaxRetries) { + throw new RuntimeException( + String.format( + "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxRetries)); + } + retryCount++; } - return cnt; + return retryCount + 1; } @VisibleForTesting @@ -1069,22 +1070,22 @@ CommitResult tryCommitOnce( } public void compactManifest() { - int cnt = 0; + int retryCount = 0; ManifestCompactResult retryResult = null; while (true) { - cnt++; retryResult = compactManifest(retryResult); if (retryResult.isSuccess()) { break; } - if (cnt >= commitMaxRetries) { + if (retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit compact manifest failed after %s attempts, there maybe exist commit conflicts between multiple jobs.", + "Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.", commitMaxRetries)); } + retryCount++; } }