diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java index 61c4619bd6d6a..b4abd0e9ec0ed 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java @@ -62,13 +62,14 @@ public void addFileCommittable(CommitMessage commitMessage) { commitMessages.add(commitMessage); } - public void addLogOffset(int bucket, long offset) { - if (logOffsets.containsKey(bucket)) { + public void addLogOffset(int bucket, long offset, boolean allowDuplicate) { + if (!allowDuplicate && logOffsets.containsKey(bucket)) { throw new RuntimeException( String.format( "bucket-%d appears multiple times, which is not possible.", bucket)); } - logOffsets.put(bucket, offset); + long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset), offset); + logOffsets.put(bucket, newOffset); } public long identifier() { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 303879337780f..d37027c789769 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -221,10 +221,12 @@ public List commitData( null, null, Collections.emptyList(), - (commit, committable) -> { - logOffsets.forEach(committable::addLogOffset); - commit.commit(committable, Collections.emptyMap()); - }); + (commit, committable) -> + logOffsets.forEach( + (bucket, offset) -> { + committable.addLogOffset(bucket, offset, false); + commit.commit(committable, Collections.emptyMap()); + })); } public List overwriteData( diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index c179a2c0a789f..8de8309bc8fbb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -83,7 +83,7 @@ private static void addFileCommittables( if (!committable.logOffsets().containsKey(bucket)) { int offset = ID.incrementAndGet(); - committable.addLogOffset(bucket, offset); + committable.addLogOffset(bucket, offset, false); assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index d237f4da56cf7..4908b99317bae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -44,6 +45,7 @@ public class StoreCommitter implements Committer committables) { if (committerMetrics == null) { return; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index aeb3e1857b9b7..537a98f97fb03 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -92,11 +92,11 @@ public WrappedManifestCommittable combine( WrappedManifestCommittable wrappedManifestCommittable, List committables) { for (MultiTableCommittable committable : committables) { + Identifier identifier = + Identifier.create(committable.getDatabase(), committable.getTable()); ManifestCommittable manifestCommittable = wrappedManifestCommittable.computeCommittableIfAbsent( - Identifier.create(committable.getDatabase(), committable.getTable()), - checkpointId, - watermark); + identifier, checkpointId, watermark); switch (committable.kind()) { case FILE: @@ -106,7 +106,9 @@ public WrappedManifestCommittable combine( case LOG_OFFSET: LogOffsetCommittable offset = (LogOffsetCommittable) committable.wrappedCommittable(); - manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); + StoreCommitter committer = tableCommitters.get(identifier); + manifestCommittable.addLogOffset( + offset.bucket(), offset.offset(), committer.allowLogOffsetDuplicate()); break; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java index 298f3155ba34e..b0aa76f157ac8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java @@ -98,7 +98,7 @@ public static void addFileCommittables( if (!committable.logOffsets().containsKey(bucket)) { int offset = ID.incrementAndGet(); - committable.addLogOffset(bucket, offset); + committable.addLogOffset(bucket, offset, false); assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset); } }