diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index d45a9336847df..ac88f887e6aef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -35,6 +35,8 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; import javax.annotation.Nullable; @@ -42,6 +44,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -120,6 +123,16 @@ public static T getNodeAs( fieldName, clazz.getName(), node.getClass().getName())); } + public static ObjectNode appendArrayNode( + ObjectNode node, String fieldName, List elememts) { + ArrayNode arrayNode = OBJECT_MAPPER_INSTANCE.createArrayNode(); + for (String element : elememts) { + arrayNode.add(element); + } + node.set(fieldName, arrayNode); + return node; + } + public static T fromJson(String json, Class clazz) { try { return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java index 590f63911bc25..066dc32252e57 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java @@ -21,7 +21,9 @@ import org.apache.paimon.flink.action.cdc.SyncTableActionBase.SchemaRetrievalException; import org.apache.paimon.flink.action.cdc.format.DataFormat; import org.apache.paimon.flink.action.cdc.format.RecordParser; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; import org.apache.paimon.schema.Schema; +import org.apache.paimon.utils.Pair; import java.util.Collections; import java.util.List; @@ -55,6 +57,7 @@ public static Schema getSchema( while (true) { Optional schema = consumer.getRecords(POLL_TIMEOUT_MILLIS).stream() + .map(DebeziumSchemaUtils::extractPrimaryKeys) .map(recordParser::buildSchema) .filter(Objects::nonNull) .findFirst(); @@ -89,7 +92,7 @@ private static void sleepSafely(int duration) { /** Wrap the consumer for different message queues. */ public interface ConsumerWrapper extends AutoCloseable { - List getRecords(int pollTimeOutMills); + List> getRecords(int pollTimeOutMills); String topic(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index 91bc946c8655a..19138073b506d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -175,7 +175,7 @@ protected void evalComputedColumns( }); } - private List extractPrimaryKeys() { + protected List extractPrimaryKeys() { ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class); if (pkNames == null) { return Collections.emptyList(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index d4d5b47a25d6a..644bf2495c73a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -63,12 +63,12 @@ */ public class DebeziumRecordParser extends RecordParser { - private static final String FIELD_SCHEMA = "schema"; - protected static final String FIELD_PAYLOAD = "payload"; + public static final String FIELD_SCHEMA = "schema"; + public static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; - private static final String FIELD_PRIMARY = "pkNames"; + public static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; @@ -81,6 +81,8 @@ public class DebeziumRecordParser extends RecordParser { private final Map classNames = new HashMap<>(); private final Map> parameters = new HashMap<>(); + private final List primaryKeys = new ArrayList<>(); + public DebeziumRecordParser( boolean caseSensitive, TypeMapping typeMapping, List computedColumns) { super(caseSensitive, typeMapping, computedColumns); @@ -120,7 +122,7 @@ private JsonNode getBefore(String op) { @Override protected void setRoot(String record) { JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class); - + preparePrimaryKeysIfNeed(node); hasSchema = false; if (node.has(FIELD_SCHEMA)) { root = node.get(FIELD_PAYLOAD); @@ -134,6 +136,16 @@ protected void setRoot(String record) { } } + private void preparePrimaryKeysIfNeed(JsonNode node) { + primaryKeys.clear(); + if (node.has(FIELD_PRIMARY)) { + ArrayNode primaryKeyNode = (ArrayNode) node.get(FIELD_PRIMARY); + for (JsonNode element : primaryKeyNode) { + primaryKeys.add(element.asText()); + } + } + } + private void parseSchema(JsonNode schema) { debeziumTypes.clear(); classNames.clear(); @@ -212,6 +224,11 @@ protected Map extractRowData( return resultMap; } + @Override + protected List extractPrimaryKeys() { + return primaryKeys; + } + @Override protected String primaryField() { return FIELD_PRIMARY; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 13874872bac5d..ff36247aa400a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -24,9 +24,14 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.data.Bits; import io.debezium.data.geometry.Geometry; @@ -46,9 +51,13 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Base64; +import java.util.List; import java.util.Map; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PAYLOAD; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PRIMARY; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_SCHEMA; /** * Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with @@ -56,6 +65,34 @@ */ public class DebeziumSchemaUtils { + /** Append primary keys to value. */ + public static String extractPrimaryKeys(Pair record) { + String key = record.getKey(); + if (StringUtils.isBlank(key)) { + return record.getValue(); + } + List primaryKeys = Lists.newArrayList(); + JsonNode keyNode = JsonSerdeUtil.fromJson(key, JsonNode.class); + JsonNode payload; + if (keyNode.has(FIELD_SCHEMA)) { + payload = keyNode.get(FIELD_PAYLOAD); + } else { + payload = keyNode; + } + payload.fieldNames().forEachRemaining(primaryKeys::add); + ObjectNode valueNode = JsonSerdeUtil.fromJson(record.getValue(), ObjectNode.class); + + // append primary keys + JsonSerdeUtil.appendArrayNode(valueNode, FIELD_PRIMARY, primaryKeys); + try { + return JsonSerdeUtil.writeValueAsString(valueNode); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "An error occurred when automatically attaching the debezium primary keys to Value", + e); + } + } + /** Transform raw string value according to schema. */ public static String transformRawValue( @Nullable String rawValue, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index fcd0eeb8896cf..531fe374ae6b9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -20,9 +20,9 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; @@ -34,7 +34,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -84,7 +83,10 @@ public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { } kafkaSourceBuilder - .setValueOnlyDeserializer(new SimpleStringSchema()) + // .setValueOnlyDeserializer(new SimpleStringSchema()) + .setDeserializer( + new KafkaKeyValueDeserializationSchema( + kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT))) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); Properties properties = createKafkaProperties(kafkaConfig); @@ -316,11 +318,11 @@ private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.Con } @Override - public List getRecords(int pollTimeOutMills) { + public List> getRecords(int pollTimeOutMills) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(pollTimeOutMills)); return StreamSupport.stream(consumerRecords.records(topic).spliterator(), false) - .map(ConsumerRecord::value) + .map(record -> Pair.of(record.key(), record.value())) .collect(Collectors.toList()); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..4e02ce9ea3453 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** Deserialization for kafka key and value. */ +public class KafkaKeyValueDeserializationSchema + implements KafkaRecordDeserializationSchema { + + private String charset; + private String format; + + public KafkaKeyValueDeserializationSchema(String format) { + this(StandardCharsets.UTF_8.name(), format); + } + + public KafkaKeyValueDeserializationSchema(String charset, String format) { + this.charset = Preconditions.checkNotNull(charset); + this.format = format; + } + + @Override + public void deserialize(ConsumerRecord record, Collector collector) + throws IOException { + DataFormat dataFormat = DataFormat.fromConfigString(format); + String value = new String(record.value(), Charset.forName(charset)); + switch (dataFormat) { + case DEBEZIUM_JSON: + if (Objects.isNull(record.key())) { + collector.collect(value); + } else { + String key = new String(record.key(), Charset.forName(charset)); + Pair keyValue = Pair.of(key, value); + String result = DebeziumSchemaUtils.extractPrimaryKeys(keyValue); + collector.collect(result); + } + break; + default: + collector.collect(value); + break; + } + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java index 27560b835ec17..3cead8dc534d4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.ConfigOption; @@ -34,6 +35,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.TopicPatternSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.KeySharedPolicy; @@ -170,6 +172,9 @@ public static PulsarSource buildPulsarSource(Configuration pulsarConfig) .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL)) .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL)) .setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME)) + .setDeserializationSchema( + new PulsarKeyValueDeserializationSchema( + pulsarConfig.get(KafkaConnectorOptions.VALUE_FORMAT))) .setDeserializationSchema(new SimpleStringSchema()); pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics); @@ -382,12 +387,12 @@ private static class PulsarConsumerWrapper implements MessageQueueSchemaUtils.Co } @Override - public List getRecords(int pollTimeOutMills) { + public List> getRecords(int pollTimeOutMills) { try { Message message = consumer.receive(pollTimeOutMills, TimeUnit.MILLISECONDS); return message == null ? Collections.emptyList() - : Collections.singletonList(message.getValue()); + : Collections.singletonList(Pair.of(message.getKey(), message.getValue())); } catch (PulsarClientException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..144b74ddd932a --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.pulsar; + +import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.StringUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.pulsar.client.api.Message; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Deserialization for Pulsar key and value. */ +public class PulsarKeyValueDeserializationSchema implements PulsarDeserializationSchema { + + private String charset; + private String format; + + public PulsarKeyValueDeserializationSchema(String format) { + this(StandardCharsets.UTF_8.name(), format); + } + + public PulsarKeyValueDeserializationSchema(String charset, String format) { + this.charset = Preconditions.checkNotNull(charset); + this.format = format; + } + + @Override + public void deserialize(Message message, Collector collector) throws Exception { + DataFormat dataFormat = DataFormat.fromConfigString(format); + String value = new String(message.getValue(), Charset.forName(charset)); + switch (dataFormat) { + case DEBEZIUM_JSON: + if (StringUtils.isBlank(message.getKey())) { + collector.collect(value); + } else { + String key = message.getKey(); + Pair keyValue = Pair.of(key, value); + String result = DebeziumSchemaUtils.extractPrimaryKeys(keyValue); + collector.collect(result); + } + break; + default: + collector.collect(value); + break; + } + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 9db87a556f96e..09f485c4fe3fc 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -299,6 +299,29 @@ void writeRecordsToKafka(String topic, List lines) throws Exception { kafkaProducer.close(); } + void writeRecordsToKafka(String topic, Map data) throws Exception { + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put( + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); + for (Map.Entry entry : data.entrySet()) { + try { + JsonNode keyNode = objectMapper.readTree(entry.getKey()); + JsonNode valueNode = objectMapper.readTree(entry.getValue()); + if (!StringUtils.isEmpty(entry.getValue())) { + kafkaProducer.send( + new ProducerRecord<>(topic, entry.getKey(), entry.getValue())); + } + } catch (Exception e) { + // ignore + } + } + kafkaProducer.close(); + } + /** Kafka container extension for junit5. */ private static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 7aba174d3dfcf..0173ff368d4a9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -103,4 +103,16 @@ public void testSchemaIncludeRecord1() throws Exception { public void testAllTypesWithSchema() throws Exception { testAllTypesWithSchemaImpl(DEBEZIUM); } + + @Test + @Timeout(120) + public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testRecordWithPrimaryKeys(DEBEZIUM); + } + + @Test + @Timeout(120) + public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index 47ab426c2700d..023f44942f494 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -702,6 +703,94 @@ public void testSchemaIncludeRecord(String format) throws Exception { waitForResult(expected, table, rowType, primaryKeys); } + public void testRecordWithPrimaryKeys(String format) throws Exception { + String topic = "no_schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + + List lines = + readLines("kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); + Map keyValues = new HashMap<>(); + for (String line : lines) { + String[] splitLines = line.split(";"); + if (splitLines.length > 1) { + keyValues.put(splitLines[0], splitLines[1]); + } + } + try { + writeRecordsToKafka(topic, keyValues); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception { + String topic = "schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + + List lines = + readLines( + "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); + Map keyValues = new HashMap<>(); + for (String line : lines) { + String[] splitLines = line.split(";"); + if (splitLines.length > 1) { + keyValues.put(splitLines[0], splitLines[1]); + } + } + try { + writeRecordsToKafka(topic, keyValues); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.DOUBLE() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Collections.singletonList( + "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"); + waitForResult(expected, table, rowType, primaryKeys); + } + // TODO some types are different from mysql cdc; maybe need to fix public void testAllTypesWithSchemaImpl(String format) throws Exception { String topic = "schema_include_all_type"; diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt new file mode 100644 index 0000000000000..9bb00c7786d8c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} +{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt new file mode 100644 index 0000000000000..1feaafe9fc20b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}