diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index fcd0eeb8896c..64a543bebd6e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -83,9 +83,12 @@ public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN))); } - kafkaSourceBuilder - .setValueOnlyDeserializer(new SimpleStringSchema()) - .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); + KafkaValueOnlyDeserializationSchemaWrapper schema = + new KafkaValueOnlyDeserializationSchemaWrapper<>(new SimpleStringSchema()); + kafkaSourceBuilder.setDeserializer(schema); + + kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig)); + Properties properties = createKafkaProperties(kafkaConfig); StartupMode startupMode = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java new file mode 100644 index 000000000000..5e6b96670bdb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link + * ConsumerRecord}. + * + * @param the return type of the deserialization. + */ +class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { + private static final long serialVersionUID = 1L; + private final DeserializationSchema deserializationSchema; + private static final Logger LOG = + LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class); + + KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + deserializationSchema.open(context); + } + + @Override + public void deserialize(ConsumerRecord message, Collector out) + throws IOException { + if (message.value() != null) { + deserializationSchema.deserialize(message.value(), out); + } else { + // see + // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events + LOG.info( + "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " + + "please check your Debezium and Kafka configuration.", + message); + } + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 7aba174d3dfc..ba96331621d7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -18,9 +18,24 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; + /** IT cases for {@link KafkaSyncTableAction}. */ public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase { @@ -103,4 +118,48 @@ public void testSchemaIncludeRecord1() throws Exception { public void testAllTypesWithSchema() throws Exception { testAllTypesWithSchemaImpl(DEBEZIUM); } + + @Test + @Timeout(60) + public void testMessageWithNullValue() throws Exception { + final String topic = "test_null_value"; + createTestTopic(topic, 1, 1); + + List lines = readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt"); + writeRecordsToKafka(topic, lines); + + // write null value + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put( + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer<>(producerProperties); + kafkaProducer.send(new ProducerRecord<>(topic, null)); + kafkaProducer.close(); + + lines = readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt"); + writeRecordsToKafka(topic, lines); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.STRING().notNull(), DataTypes.STRING()}, + new String[] {"id", "value"}); + waitForResult( + Arrays.asList("+I[1, A]", "+I[2, B]"), + getFileStoreTable(tableName), + rowType, + Collections.singletonList("id")); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt new file mode 100644 index 000000000000..fda10c2c1eb8 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 1, "value": "A"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt new file mode 100644 index 000000000000..d8de79c2948c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nullvalue/debezium-data-2.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 2, "value": "B"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684884000, "snapshot": "false", "db": "test", "sequence": null, "table": "test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684884000, "transaction": null}