diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 4808975fa763..43faadc4d8ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -747,9 +747,7 @@ private int tryCommit( retryResult = (RetryResult) result; if (retryCount >= commitMaxRetries) { - if (retryResult != null) { - retryResult.cleanAll(); - } + retryResult.cleanAll(); throw new RuntimeException( String.format( "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", @@ -767,75 +765,52 @@ private int tryOverwrite( long identifier, @Nullable Long watermark, Map logOffsets) { - int retryCount = 0; - while (true) { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(); - - List changesWithOverwrite = new ArrayList<>(); - List indexChangesWithOverwrite = new ArrayList<>(); - if (latestSnapshot != null) { - List currentEntries = - scan.withSnapshot(latestSnapshot) - .withPartitionFilter(partitionFilter) - .withKind(ScanMode.ALL) - .plan() - .files(); - for (ManifestEntry entry : currentEntries) { - changesWithOverwrite.add( - new ManifestEntry( - FileKind.DELETE, - entry.partition(), - entry.bucket(), - entry.totalBuckets(), - entry.file())); - } + // collect all files with overwrite + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + List changesWithOverwrite = new ArrayList<>(); + List indexChangesWithOverwrite = new ArrayList<>(); + if (latestSnapshot != null) { + List currentEntries = + scan.withSnapshot(latestSnapshot) + .withPartitionFilter(partitionFilter) + .withKind(ScanMode.ALL) + .plan() + .files(); + for (ManifestEntry entry : currentEntries) { + changesWithOverwrite.add( + new ManifestEntry( + FileKind.DELETE, + entry.partition(), + entry.bucket(), + entry.totalBuckets(), + entry.file())); + } - // collect index files - if (latestSnapshot.indexManifest() != null) { - List entries = - indexManifestFile.read(latestSnapshot.indexManifest()); - for (IndexManifestEntry entry : entries) { - if (partitionFilter == null || partitionFilter.test(entry.partition())) { - indexChangesWithOverwrite.add(entry.toDeleteEntry()); - } + // collect index files + if (latestSnapshot.indexManifest() != null) { + List entries = + indexManifestFile.read(latestSnapshot.indexManifest()); + for (IndexManifestEntry entry : entries) { + if (partitionFilter == null || partitionFilter.test(entry.partition())) { + indexChangesWithOverwrite.add(entry.toDeleteEntry()); } } } - changesWithOverwrite.addAll(changes); - indexChangesWithOverwrite.addAll(indexFiles); - - CommitResult result = - tryCommitOnce( - null, - changesWithOverwrite, - Collections.emptyList(), - indexChangesWithOverwrite, - identifier, - watermark, - logOffsets, - Snapshot.CommitKind.OVERWRITE, - latestSnapshot, - mustConflictCheck(), - branchName, - null); - - if (result.isSuccess()) { - break; - } - - // TODO optimize OVERWRITE too - RetryResult retryResult = (RetryResult) result; - retryResult.cleanAll(); - - if (retryCount >= commitMaxRetries) { - throw new RuntimeException( - String.format( - "Commit failed after %s retries, there maybe exist commit conflicts between multiple jobs.", - commitMaxRetries)); - } - retryCount++; } - return retryCount + 1; + changesWithOverwrite.addAll(changes); + indexChangesWithOverwrite.addAll(indexFiles); + + return tryCommit( + changesWithOverwrite, + Collections.emptyList(), + indexChangesWithOverwrite, + identifier, + watermark, + logOffsets, + Snapshot.CommitKind.OVERWRITE, + mustConflictCheck(), + branchName, + null); } @VisibleForTesting