diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index eb55b327220b..74fe1c17ae79 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1935,8 +1935,8 @@ public boolean commitForceCompact() { return options.get(COMMIT_FORCE_COMPACT); } - public Optional commitMaxTimeout() { - return options.getOptional(COMMIT_MAX_TIMEOUT); + public Duration commitMaxTimeout() { + return options.get(COMMIT_MAX_TIMEOUT); } public int commitMaxRetries() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 885d2a126baf..a8673131a157 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -135,7 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final List commitCallbacks; private final StatsFileHandler statsFileHandler; private final BucketMode bucketMode; - private final Optional commitMaxTimeout; + @Nullable private Duration commitMaxTimeout; private final int commitMaxRetries; @Nullable private Lock lock; @@ -169,7 +169,7 @@ public FileStoreCommitImpl( @Nullable Integer manifestReadParallelism, List commitCallbacks, int commitMaxRetries, - Optional commitMaxTimeout) { + @Nullable Duration commitMaxTimeout) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.tableName = tableName; @@ -750,24 +750,17 @@ private int tryCommit( } retryResult = (RetryResult) result; - - if (retryCount >= commitMaxRetries) { - retryResult.cleanAll(); - throw new RuntimeException( - String.format( - "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } retryCount++; - if (commitMaxTimeout.isPresent() - && System.currentTimeMillis() - startMillis - > commitMaxTimeout.get().toMillis()) { + if ((commitMaxTimeout != null + && System.currentTimeMillis() - startMillis + > commitMaxTimeout.toMillis()) + || retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit failed after %s millis, there maybe exist commit conflicts between multiple jobs.", - commitMaxTimeout.get().toMillis())); + "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxTimeout.toMillis(), retryCount)); } } return retryCount + 1; @@ -1074,23 +1067,17 @@ public void compactManifest() { break; } - if (retryCount >= commitMaxRetries) { - retryResult.cleanAll(); - throw new RuntimeException( - String.format( - "Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } retryCount++; - if (commitMaxTimeout.isPresent() - && System.currentTimeMillis() - startMillis - > commitMaxTimeout.get().toMillis()) { + if ((commitMaxTimeout != null + && System.currentTimeMillis() - startMillis + > commitMaxTimeout.toMillis()) + || retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit failed after %s millis, there maybe exist commit conflicts between multiple jobs.", - commitMaxTimeout.get().toMillis())); + "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", + commitMaxTimeout.toMillis(), retryCount)); } } }