Skip to content

Commit

Permalink
[cdc] Fix that kafka message value might be null which causes NPE (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Mar 29, 2024
1 parent 3d6c5fc commit dfcf1e1
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
}

kafkaSourceBuilder
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
KafkaValueOnlyDeserializationSchemaWrapper<String> schema =
new KafkaValueOnlyDeserializationSchemaWrapper<>(new SimpleStringSchema());
kafkaSourceBuilder.setDeserializer(schema);

kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig));

Properties properties = createKafkaProperties(kafkaConfig);

StartupMode startupMode =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link
* ConsumerRecord}.
*
* @param <T> the return type of the deserialization.
*/
class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final DeserializationSchema<T> deserializationSchema;
private static final Logger LOG =
LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class);

KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}

@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
deserializationSchema.open(context);
}

@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws IOException {
if (message.value() != null) {
deserializationSchema.deserialize(message.value(), out);
} else {
// see
// https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events
LOG.info(
"Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, "
+ "please check your Debezium and Kafka configuration.",
message);
}
}

@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,24 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;

/** IT cases for {@link KafkaSyncTableAction}. */
public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase {

Expand Down Expand Up @@ -103,4 +118,48 @@ public void testSchemaIncludeRecord1() throws Exception {
public void testAllTypesWithSchema() throws Exception {
testAllTypesWithSchemaImpl(DEBEZIUM);
}

@Test
@Timeout(60)
public void testMessageWithNullValue() throws Exception {
final String topic = "test_null_value";
createTestTopic(topic, 1, 1);

List<String> lines = readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt");
writeRecordsToKafka(topic, lines);

// write null value
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProperties);
kafkaProducer.send(new ProducerRecord<>(topic, null));
kafkaProducer.close();

lines = readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt");
writeRecordsToKafka(topic, lines);

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING().notNull(), DataTypes.STRING()},
new String[] {"id", "value"});
waitForResult(
Arrays.asList("+I[1, A]", "+I[2, B]"),
getFileStoreTable(tableName),
rowType,
Collections.singletonList("id"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"before": null, "after": {"id": 1, "value": "A"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"before": null, "after": {"id": 2, "value": "B"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684884000, "snapshot": "false", "db": "test", "sequence": null, "table": "test", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684884000, "transaction": null}

0 comments on commit dfcf1e1

Please sign in to comment.