diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 7d6bacccb026..52b64a3a565d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -134,6 +134,12 @@ Integer Maximum number of retries when commit failed. + +
commit.timeout
+ (none) + Duration + Timeout duration of retry when commit failed. +
commit.user-prefix
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 8aebf2f289a0..f42bb8aeca98 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -527,6 +527,12 @@ public class CoreOptions implements Serializable { .defaultValue(false) .withDescription("Whether to force a compaction before commit."); + public static final ConfigOption COMMIT_TIMEOUT = + key("commit.timeout") + .durationType() + .noDefaultValue() + .withDescription("Timeout duration of retry when commit failed."); + public static final ConfigOption COMMIT_MAX_RETRIES = key("commit.max-retries") .intType() @@ -1929,6 +1935,12 @@ public boolean commitForceCompact() { return options.get(COMMIT_FORCE_COMPACT); } + public long commitTimeout() { + return options.get(COMMIT_TIMEOUT) == null + ? Long.MAX_VALUE + : options.get(COMMIT_TIMEOUT).toMillis(); + } + public int commitMaxRetries() { return options.get(COMMIT_MAX_RETRIES); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 54f554aa46d3..e6d63149149c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -237,7 +237,8 @@ public FileStoreCommitImpl newCommit(String commitUser, List cal bucketMode(), options.scanManifestParallelism(), callbacks, - options.commitMaxRetries()); + options.commitMaxRetries(), + options.commitTimeout()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 43faadc4d8ec..f09d3e39ae55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -134,6 +134,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final List commitCallbacks; private final StatsFileHandler statsFileHandler; private final BucketMode bucketMode; + private long commitTimeout; private final int commitMaxRetries; @Nullable private Lock lock; @@ -166,7 +167,8 @@ public FileStoreCommitImpl( BucketMode bucketMode, @Nullable Integer manifestReadParallelism, List commitCallbacks, - int commitMaxRetries) { + int commitMaxRetries, + long commitTimeout) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.tableName = tableName; @@ -193,6 +195,7 @@ public FileStoreCommitImpl( this.manifestReadParallelism = manifestReadParallelism; this.commitCallbacks = commitCallbacks; this.commitMaxRetries = commitMaxRetries; + this.commitTimeout = commitTimeout; this.lock = null; this.ignoreEmptyCommit = true; @@ -723,6 +726,7 @@ private int tryCommit( @Nullable String statsFileName) { int retryCount = 0; RetryResult retryResult = null; + long startMillis = System.currentTimeMillis(); while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); CommitResult result = @@ -746,13 +750,15 @@ private int tryCommit( retryResult = (RetryResult) result; - if (retryCount >= commitMaxRetries) { + if (System.currentTimeMillis() - startMillis > commitTimeout + || retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); + "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", + commitTimeout, retryCount)); } + retryCount++; } return retryCount + 1; @@ -1052,19 +1058,22 @@ CommitResult tryCommitOnce( public void compactManifest() { int retryCount = 0; ManifestCompactResult retryResult = null; + long startMillis = System.currentTimeMillis(); while (true) { retryResult = compactManifest(retryResult); if (retryResult.isSuccess()) { break; } - if (retryCount >= commitMaxRetries) { + if (System.currentTimeMillis() - startMillis > commitTimeout + || retryCount >= commitMaxRetries) { retryResult.cleanAll(); throw new RuntimeException( String.format( - "Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); + "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", + commitTimeout, retryCount)); } + retryCount++; } } diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index d5db52cb03df..80e785353526 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam. --> - +