diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index cfea4ff57321..620bcbbc49b1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -69,18 +69,29 @@ public boolean forceCreatingSnapshot() { public ManifestCommittable combine( long checkpointId, long watermark, List committables) { ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark); - addFile(manifestCommittable, committables); - return manifestCommittable; + return combine(checkpointId, watermark, manifestCommittable, committables); } @Override public ManifestCommittable combine( long checkpointId, long watermark, - ManifestCommittable oldCommitable, + ManifestCommittable manifestCommittable, List committables) { - addFile(oldCommitable, committables); - return oldCommitable; + for (Committable committable : committables) { + switch (committable.kind()) { + case FILE: + CommitMessage file = (CommitMessage) committable.wrappedCommittable(); + manifestCommittable.addFileCommittable(file); + break; + case LOG_OFFSET: + LogOffsetCommittable offset = + (LogOffsetCommittable) committable.wrappedCommittable(); + manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); + break; + } + } + return manifestCommittable; } @Override @@ -109,22 +120,6 @@ public void close() throws Exception { commit.close(); } - private void addFile(ManifestCommittable manifestCommittable, List committables) { - for (Committable committable : committables) { - switch (committable.kind()) { - case FILE: - CommitMessage file = (CommitMessage) committable.wrappedCommittable(); - manifestCommittable.addFileCommittable(file); - break; - case LOG_OFFSET: - LogOffsetCommittable offset = - (LogOffsetCommittable) committable.wrappedCommittable(); - manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); - break; - } - } - } - private void calcNumBytesAndRecordsOut(List committables) { if (committerMetrics == null) { return; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 4fc82c48a189..a2d3f16ba021 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -89,8 +89,7 @@ public WrappedManifestCommittable combine( long checkpointId, long watermark, List committables) { WrappedManifestCommittable wrappedManifestCommittable = new WrappedManifestCommittable(checkpointId, watermark); - combineFile(checkpointId, watermark, wrappedManifestCommittable, committables); - return wrappedManifestCommittable; + return combine(checkpointId, watermark, wrappedManifestCommittable, committables); } @Override @@ -99,7 +98,25 @@ public WrappedManifestCommittable combine( long watermark, WrappedManifestCommittable wrappedManifestCommittable, List committables) { - combineFile(checkpointId, watermark, wrappedManifestCommittable, committables); + for (MultiTableCommittable committable : committables) { + ManifestCommittable manifestCommittable = + wrappedManifestCommittable.computeCommittableIfAbsent( + Identifier.create(committable.getDatabase(), committable.getTable()), + checkpointId, + watermark); + + switch (committable.kind()) { + case FILE: + CommitMessage file = (CommitMessage) committable.wrappedCommittable(); + manifestCommittable.addFileCommittable(file); + break; + case LOG_OFFSET: + LogOffsetCommittable offset = + (LogOffsetCommittable) committable.wrappedCommittable(); + manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); + break; + } + } return wrappedManifestCommittable; } @@ -168,32 +185,6 @@ public Map> groupByCheckpoint( return grouped; } - private void combineFile( - long checkpointId, - long watermark, - WrappedManifestCommittable wrappedManifestCommittable, - List committables) { - for (MultiTableCommittable committable : committables) { - ManifestCommittable manifestCommittable = - wrappedManifestCommittable.computeCommittableIfAbsent( - Identifier.create(committable.getDatabase(), committable.getTable()), - checkpointId, - watermark); - - switch (committable.kind()) { - case FILE: - CommitMessage file = (CommitMessage) committable.wrappedCommittable(); - manifestCommittable.addFileCommittable(file); - break; - case LOG_OFFSET: - LogOffsetCommittable offset = - (LogOffsetCommittable) committable.wrappedCommittable(); - manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); - break; - } - } - } - private StoreCommitter getStoreCommitter(Identifier tableId) { StoreCommitter committer = tableCommitters.get(tableId);