Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce timeout for commit retry avoid long time loop #4668

Merged
merged 12 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
<td>Integer</td>
<td>Maximum number of retries when commit failed.</td>
</tr>
<tr>
<td><h5>commit.timeout</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Timeout duration of retry when commit failed.</td>
</tr>
<tr>
<td><h5>commit.user-prefix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to force a compaction before commit.");

public static final ConfigOption<Duration> COMMIT_TIMEOUT =
key("commit.timeout")
.durationType()
.noDefaultValue()
.withDescription("Timeout duration of retry when commit failed.");

public static final ConfigOption<Integer> COMMIT_MAX_RETRIES =
key("commit.max-retries")
.intType()
Expand Down Expand Up @@ -1929,6 +1935,12 @@ public boolean commitForceCompact() {
return options.get(COMMIT_FORCE_COMPACT);
}

public long commitTimeout() {
return options.get(COMMIT_TIMEOUT) == null
? Long.MAX_VALUE
: options.get(COMMIT_TIMEOUT).toMillis();
}

public int commitMaxRetries() {
return options.get(COMMIT_MAX_RETRIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
bucketMode(),
options.scanManifestParallelism(),
callbacks,
options.commitMaxRetries());
options.commitMaxRetries(),
options.commitTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final List<CommitCallback> commitCallbacks;
private final StatsFileHandler statsFileHandler;
private final BucketMode bucketMode;
@Nullable private long commitTimeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove @nullable

private final int commitMaxRetries;

@Nullable private Lock lock;
Expand Down Expand Up @@ -166,7 +167,8 @@ public FileStoreCommitImpl(
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism,
List<CommitCallback> commitCallbacks,
int commitMaxRetries) {
int commitMaxRetries,
@Nullable long commitTimeout) {
Copy link
Contributor

@JingsongLi JingsongLi Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove @nullable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for reminder @JingsongLi

this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.tableName = tableName;
Expand All @@ -193,6 +195,7 @@ public FileStoreCommitImpl(
this.manifestReadParallelism = manifestReadParallelism;
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
this.commitTimeout = commitTimeout;

this.lock = null;
this.ignoreEmptyCommit = true;
Expand Down Expand Up @@ -723,6 +726,7 @@ private int tryCommit(
@Nullable String statsFileName) {
int retryCount = 0;
RetryResult retryResult = null;
long startMillis = System.currentTimeMillis();
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
CommitResult result =
Expand All @@ -746,13 +750,15 @@ private int tryCommit(

retryResult = (RetryResult) result;

if (retryCount >= commitMaxRetries) {
if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
commitTimeout, retryCount));
}

retryCount++;
}
return retryCount + 1;
Expand Down Expand Up @@ -1052,19 +1058,22 @@ CommitResult tryCommitOnce(
public void compactManifest() {
int retryCount = 0;
ManifestCompactResult retryResult = null;
long startMillis = System.currentTimeMillis();
while (true) {
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}

if (retryCount >= commitMaxRetries) {
if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit compact manifest failed after %s retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
commitTimeout, retryCount));
}

retryCount++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tools/maven/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam.
-->

<module name="FileLength">
<property name="max" value="3000"/>
<property name="max" value="4000"/>
</module>

<!-- All Java AST specific tests live under TreeWalker module. -->
Expand Down
Loading