From 30bf5036d6117aedce7297dacec23155a7d5778c Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Wed, 13 Nov 2024 17:04:39 +0800 Subject: [PATCH] [flink] support multiple writers writing to the same partition when using kafka as logSystem in unaware bucket mode. (#4516) --- .../apache/paimon/manifest/ManifestCommittable.java | 7 ++++--- .../src/test/java/org/apache/paimon/TestFileStore.java | 3 ++- .../manifest/ManifestCommittableSerializerTest.java | 2 +- .../org/apache/paimon/flink/sink/StoreCommitter.java | 10 +++++++++- .../apache/paimon/flink/sink/StoreMultiCommitter.java | 10 ++++++---- .../sink/WrappedManifestCommittableSerializerTest.java | 2 +- 6 files changed, 23 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java index 61c4619bd6d6..b4abd0e9ec0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java @@ -62,13 +62,14 @@ public void addFileCommittable(CommitMessage commitMessage) { commitMessages.add(commitMessage); } - public void addLogOffset(int bucket, long offset) { - if (logOffsets.containsKey(bucket)) { + public void addLogOffset(int bucket, long offset, boolean allowDuplicate) { + if (!allowDuplicate && logOffsets.containsKey(bucket)) { throw new RuntimeException( String.format( "bucket-%d appears multiple times, which is not possible.", bucket)); } - logOffsets.put(bucket, offset); + long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset), offset); + logOffsets.put(bucket, newOffset); } public long identifier() { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 303879337780..5218a515a337 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -222,7 +222,8 @@ public List commitData( null, Collections.emptyList(), (commit, committable) -> { - logOffsets.forEach(committable::addLogOffset); + logOffsets.forEach( + (bucket, offset) -> committable.addLogOffset(bucket, offset, false)); commit.commit(committable, Collections.emptyMap()); }); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index c179a2c0a789..8de8309bc8fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -83,7 +83,7 @@ private static void addFileCommittables( if (!committable.logOffsets().containsKey(bucket)) { int offset = ID.incrementAndGet(); - committable.addLogOffset(bucket, offset); + committable.addLogOffset(bucket, offset, false); assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index d237f4da56cf..4908b99317ba 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -44,6 +45,7 @@ public class StoreCommitter implements Committer committables) { if (committerMetrics == null) { return; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index aeb3e1857b9b..537a98f97fb0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -92,11 +92,11 @@ public WrappedManifestCommittable combine( WrappedManifestCommittable wrappedManifestCommittable, List committables) { for (MultiTableCommittable committable : committables) { + Identifier identifier = + Identifier.create(committable.getDatabase(), committable.getTable()); ManifestCommittable manifestCommittable = wrappedManifestCommittable.computeCommittableIfAbsent( - Identifier.create(committable.getDatabase(), committable.getTable()), - checkpointId, - watermark); + identifier, checkpointId, watermark); switch (committable.kind()) { case FILE: @@ -106,7 +106,9 @@ public WrappedManifestCommittable combine( case LOG_OFFSET: LogOffsetCommittable offset = (LogOffsetCommittable) committable.wrappedCommittable(); - manifestCommittable.addLogOffset(offset.bucket(), offset.offset()); + StoreCommitter committer = tableCommitters.get(identifier); + manifestCommittable.addLogOffset( + offset.bucket(), offset.offset(), committer.allowLogOffsetDuplicate()); break; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java index 298f3155ba34..b0aa76f157ac 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java @@ -98,7 +98,7 @@ public static void addFileCommittables( if (!committable.logOffsets().containsKey(bucket)) { int offset = ID.incrementAndGet(); - committable.addLogOffset(bucket, offset); + committable.addLogOffset(bucket, offset, false); assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset); } }