diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 2bea3c2248df..25c29be9fbb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -29,6 +29,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Restorable; import java.util.List; @@ -48,6 +49,7 @@ public class TableWriteImpl implements InnerTableWrite, Restorable recordExtractor; private boolean batchCommitted = false; + private BucketMode bucketMode; public TableWriteImpl( FileStoreWrite write, @@ -92,6 +94,11 @@ public TableWriteImpl withCompactExecutor(ExecutorService compactExecutor) { return this; } + public TableWriteImpl withBucketMode(BucketMode bucketMode) { + this.bucketMode = bucketMode; + return this; + } + @Override public BinaryRow getPartition(InternalRow row) { keyAndBucketExtractor.setRecord(row); @@ -136,7 +143,7 @@ public SinkRecord toLogRecord(SinkRecord record) { keyAndBucketExtractor.setRecord(record.row()); return new SinkRecord( record.partition(), - record.bucket(), + bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(), keyAndBucketExtractor.logPrimaryKey(), record.row()); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java index bb4c8645fb36..1fd80d356e55 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java @@ -81,6 +81,7 @@ public ProducerRecord serialize(SinkRecord element, @Nullable Lo } else { valueBytes = valueSerializer.serialize(new FlinkRowData(element.row())); } - return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes, valueBytes); + Integer partition = element.bucket() < 0 ? null : element.bucket(); + return new ProducerRecord<>(topic, partition, primaryKeyBytes, valueBytes); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java index 1e64f4e68717..9d8e2295a8de 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java @@ -63,6 +63,12 @@ public void testNonKeyed() throws Exception { checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3); } + @Test + public void testUnawareBucket() throws Exception { + checkNonKeyed(LogChangelogMode.AUTO, -1, 3, 5); + checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3); + } + private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value) throws Exception { check(mode, true, bucket, key, value, RowKind.INSERT); @@ -92,7 +98,11 @@ private void check( SinkRecord input = testRecord(keyed, bucket, key, value, rowKind); ProducerRecord record = serializer.serialize(input, null); - assertThat(record.partition().intValue()).isEqualTo(bucket); + if (bucket >= 0) { + assertThat(record.partition().intValue()).isEqualTo(bucket); + } else { + assertThat(record.partition()).isNull(); + } AtomicReference rowReference = new AtomicReference<>(); deserializer.deserialize( @@ -129,7 +139,8 @@ public void close() {} } private ConsumerRecord toConsumerRecord(ProducerRecord record) { - return new ConsumerRecord<>(TOPIC, record.partition(), 0, record.key(), record.value()); + int partition = record.partition() == null ? -1 : record.partition(); + return new ConsumerRecord<>(TOPIC, partition, 0, record.key(), record.value()); } private static KafkaLogSerializationSchema createTestSerializationSchema( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java index a32e7b54ccfc..d4a64ea542f6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java @@ -26,12 +26,18 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.types.Row; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -370,4 +376,50 @@ public void testLogWriteReadWithVirtual() throws Exception { deleteTopicIfExists(topic); } } + + @Test + @Timeout(120) + public void testAppendOnlyWithUnawareBucket() throws Exception { + String topic = UUID.randomUUID().toString(); + createTopicIfNotExists(topic, 2); + + try { + // disable checkpointing to test eventual + env.getCheckpointConfig().disableCheckpointing(); + env.setParallelism(1); + tEnv.executeSql( + String.format( + "CREATE TABLE T (i INT, j INT) WITH (" + + "'log.system'='kafka', " + + "'log.consistency'='eventual', " + + "'bucket'='-1', " + + "'kafka.bootstrap.servers'='%s', " + + "'kafka.topic'='%s'," + + "'kafka.batch.size'='20')", + getBootstrapServers(), topic)); + tEnv.executeSql( + "CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen', 'rows-per-second'='2')"); + TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM gen"); + BlockingIterator read = + BlockingIterator.of(tEnv.executeSql("SELECT * FROM T").collect()); + List collect = read.collect(10); + assertThat(collect).hasSize(10); + write.getJobClient().get().cancel(); + read.close(); + + // check offsets + try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { + Map topicPartitionOffsets = new HashMap<>(4); + for (int i = 0; i < 2; i++) { + topicPartitionOffsets.put(new TopicPartition(topic, i), OffsetSpec.latest()); + } + Map result = + adminClient.listOffsets(topicPartitionOffsets).all().get(); + assertThat(result.values()) + .allMatch(partitionOffsetInfo -> partitionOffsetInfo.offset() > 0); + } + } finally { + deleteTopicIfExists(topic); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index ceffa7648bfc..968ace78ac52 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -146,7 +146,8 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { state.stateValueFilter().filter(table.name(), part, bucket)) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) - .withExecutionMode(isStreamingMode); + .withExecutionMode(isStreamingMode) + .withBucketMode(table.bucketMode()); if (metricGroup != null) { tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));