From 4db2e9a676e07c77f0b2dfae8edb0f0a643aa213 Mon Sep 17 00:00:00 2001 From: monster <60029759+MonsterChenzhuo@users.noreply.github.com> Date: Tue, 7 Nov 2023 16:06:18 +0800 Subject: [PATCH] [flink][kafka-cdc] Support table debezium json format (#2251) --- .../apache/paimon/utils/JsonSerdeUtil.java | 36 ++++ .../flink/action/cdc/format/DataFormat.java | 4 +- .../format/debezium/DebeziumRecordParser.java | 143 ++++++++++++++ .../JsonPrimaryKeyDeserializationSchema.java | 67 +++++++ .../action/cdc/kafka/KafkaActionUtils.java | 43 ++++- .../cdc/kafka/KafkaSyncTableAction.java | 4 +- .../KafkaDebeziumSyncTableActionITCase.java | 178 ++++++++++++++++++ .../table/computedcolumn/debezium-data-1.txt | 19 ++ .../table/schemaevolution/debezium-data-1.txt | 20 ++ .../table/schemaevolution/debezium-data-2.txt | 20 ++ .../table/schemaevolution/debezium-data-3.txt | 22 +++ 11 files changed, 543 insertions(+), 13 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 3b49ffd6d5de..803f4f3da4c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -35,11 +35,14 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; import java.io.UncheckedIOException; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -219,6 +222,39 @@ public static JsonNode toTree(T value) { return OBJECT_MAPPER_INSTANCE.valueToTree(value); } + /** + * Adds an array of values to a JSON string under the specified key. + * + * @param origin The original JSON string. + * @param key The key under which the values will be added as an array. + * @param values A list of values to be added to the JSON string. + * @return The JSON string with the added array. If the JSON string is not a valid JSON object, + * or if the list of values is empty or null, the original JSON string will be returned. + * @throws RuntimeException If an error occurs while parsing the JSON string or adding the + * values. + */ + public static String putArrayToJsonString(String origin, String key, List values) { + if (values == null || values.isEmpty()) { + return origin; + } + + try { + JsonNode jsonNode = OBJECT_MAPPER_INSTANCE.readTree(origin); + if (jsonNode.isObject()) { + ObjectNode objectNode = (ObjectNode) jsonNode; + ArrayNode arrayNode = objectNode.putArray(key); + for (String value : values) { + arrayNode.add(value); + } + return OBJECT_MAPPER_INSTANCE.writeValueAsString(objectNode); + } else { + return origin; + } + } catch (Exception e) { + throw new RuntimeException("Failed to add array to JSON", e); + } + } + public static boolean isNull(JsonNode jsonNode) { return jsonNode == null || jsonNode.isNull(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java index fd5f9960200b..28dc3e457c09 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser; import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser; import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser; @@ -36,7 +37,8 @@ public enum DataFormat { CANAL_JSON(CanalRecordParser::new), OGG_JSON(OggRecordParser::new), - MAXWELL_JSON(MaxwellRecordParser::new); + MAXWELL_JSON(MaxwellRecordParser::new), + DEBEZIUM_JSON(DebeziumRecordParser::new); // Add more data formats here if needed private final RecordParserFactory parser; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java new file mode 100644 index 000000000000..b1d858bf4e82 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.debezium; + +import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.RecordParser; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.RowKind; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument; +import static org.apache.paimon.utils.JsonSerdeUtil.isNull; + +/** + * The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed + * to parse records from Debezium's JSON change data capture (CDC) format. Debezium is a CDC + * solution for MySQL databases that captures row-level changes to database tables and outputs them + * in JSON format. This parser extracts relevant information from the Debezium-JSON format and + * converts it into a list of {@link RichCdcMultiplexRecord} objects. + * + *

The class supports various database operations such as INSERT, UPDATE, DELETE, and READ + * (snapshot reads), and creates corresponding {@link RichCdcMultiplexRecord} objects to represent + * these changes. + * + *

Validation is performed to ensure that the JSON records contain all necessary fields, + * including the 'before' and 'after' states for UPDATE operations, and the class also supports + * schema extraction for the Kafka topic. Debezium's specific fields such as 'source', 'op' for + * operation type, and primary key field names are used to construct the details of each record + * event. + */ +public class DebeziumRecordParser extends RecordParser { + + private static final String FIELD_BEFORE = "before"; + private static final String FIELD_AFTER = "after"; + private static final String FIELD_SOURCE = "source"; + private static final String FIELD_PRIMARY = "pkNames"; + private static final String FIELD_DB = "db"; + private static final String FIELD_TYPE = "op"; + private static final String OP_INSERT = "c"; + private static final String OP_UPDATE = "u"; + private static final String OP_DELETE = "d"; + private static final String OP_READE = "r"; + + public DebeziumRecordParser( + boolean caseSensitive, TypeMapping typeMapping, List computedColumns) { + super(caseSensitive, typeMapping, computedColumns); + } + + @Override + public List extractRecords() { + String operation = extractStringFromRootJson(FIELD_TYPE); + List records = new ArrayList<>(); + switch (operation) { + case OP_INSERT: + case OP_READE: + processRecord(root.get(dataField()), RowKind.INSERT, records); + break; + case OP_UPDATE: + processRecord( + mergeOldRecord(root.get(dataField()), root.get(FIELD_BEFORE)), + RowKind.DELETE, + records); + processRecord(root.get(dataField()), RowKind.INSERT, records); + break; + case OP_DELETE: + processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records); + break; + default: + throw new UnsupportedOperationException("Unknown record operation: " + operation); + } + return records; + } + + @Override + protected void validateFormat() { + String errorMessageTemplate = + "Didn't find '%s' node in json. Please make sure your topic's format is correct."; + checkArgument( + !isNull(root.get(FIELD_SOURCE).get(FIELD_TABLE)), + errorMessageTemplate, + FIELD_TABLE); + checkArgument( + !isNull(root.get(FIELD_SOURCE).get(FIELD_DB)), + errorMessageTemplate, + FIELD_DATABASE); + checkArgument(!isNull(root.get(FIELD_TYPE)), errorMessageTemplate, FIELD_TYPE); + String operation = root.get(FIELD_TYPE).asText(); + switch (operation) { + case OP_INSERT: + case OP_READE: + checkArgument(!isNull(root.get(dataField())), errorMessageTemplate, dataField()); + break; + case OP_UPDATE: + case OP_DELETE: + checkArgument(!isNull(root.get(FIELD_BEFORE)), errorMessageTemplate, FIELD_BEFORE); + break; + default: + throw new IllegalArgumentException("Unsupported operation type: " + operation); + } + checkArgument(!isNull(root.get(primaryField())), errorMessageTemplate, primaryField()); + } + + @Override + protected String primaryField() { + return FIELD_PRIMARY; + } + + @Override + protected String dataField() { + return FIELD_AFTER; + } + + @Override + protected String extractStringFromRootJson(String key) { + if (key.equals(FIELD_TABLE)) { + tableName = root.get(FIELD_SOURCE).get(FIELD_TABLE).asText(); + return tableName; + } else if (key.equals(FIELD_DATABASE)) { + databaseName = root.get(FIELD_SOURCE).get(FIELD_DB).asText(); + return databaseName; + } + return root.get(key) != null ? root.get(key).asText() : null; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java new file mode 100644 index 000000000000..e854a9bb74a3 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.debezium; + +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** + * This class is used to deserialize byte[] messages into String format, and then add primary key + * fields to the JSON string. + */ +public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema { + + public static final String PRIMARY_KEY_NAMES = "pkNames"; + private final List primaryKeyNames; + + public JsonPrimaryKeyDeserializationSchema(List primaryKeyNames) { + this.primaryKeyNames = checkNotNull(primaryKeyNames); + if (this.primaryKeyNames.isEmpty()) { + throw new IllegalArgumentException("primary key must not be empty"); + } + } + + @Override + public String deserialize(byte[] message) { + try { + String value = new String(message, StandardCharsets.UTF_8); + return JsonSerdeUtil.putArrayToJsonString(value, PRIMARY_KEY_NAMES, primaryKeyNames); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize message", e); + } + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 5f3c98f9b8cf..8dceaeb5de68 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -20,6 +20,8 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.flink.action.cdc.format.debezium.JsonPrimaryKeyDeserializationSchema; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -34,7 +36,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.CollectionUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -54,6 +55,7 @@ import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -65,8 +67,14 @@ public class KafkaActionUtils { private static final String PARTITION = "partition"; private static final String OFFSET = "offset"; + private static final String DEBEZIUM_JSON = "debezium-json"; public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { + return buildKafkaSource(kafkaConfig, new ArrayList<>()); + } + + public static KafkaSource buildKafkaSource( + Configuration kafkaConfig, List primaryKeys) { validateKafkaConfig(kafkaConfig); KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); @@ -77,8 +85,11 @@ public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { kafkaSourceBuilder .setTopics(topics) - .setValueOnlyDeserializer(new SimpleStringSchema()) - .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); + .setGroupId(kafkaPropertiesGroupId(kafkaConfig)) + .setValueOnlyDeserializer( + DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT)) + ? new JsonPrimaryKeyDeserializationSchema(primaryKeys) + : new SimpleStringSchema()); Properties properties = new Properties(); for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { String key = entry.getKey(); @@ -262,6 +273,11 @@ static DataFormat getDataFormat(Configuration kafkaConfig) { static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Configuration kafkaConfig, String topic) { + return getKafkaEarliestConsumer(kafkaConfig, topic, new ArrayList<>()); + } + + static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( + Configuration kafkaConfig, String topic, List primaryKeys) { Properties props = new Properties(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -286,26 +302,33 @@ static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Collections.singletonList(new TopicPartition(topic, firstPartition)); consumer.assign(topicPartitions); consumer.seekToBeginning(topicPartitions); - - return new KafkaConsumerWrapper(consumer); + return new KafkaConsumerWrapper( + consumer, + DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT)) + ? primaryKeys + : new ArrayList<>()); } private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper { + private static final String PK_NAMES_KEY = "pkNames"; + private final KafkaConsumer consumer; - KafkaConsumerWrapper(KafkaConsumer kafkaConsumer) { + private final List pkNames; + + KafkaConsumerWrapper(KafkaConsumer kafkaConsumer, List pkNames) { this.consumer = kafkaConsumer; + this.pkNames = pkNames; } @Override public List getRecords(String topic, int pollTimeOutMills) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(pollTimeOutMills)); - Iterable> records = consumerRecords.records(topic); - List result = new ArrayList<>(); - records.forEach(r -> result.add(r.value())); - return result; + return StreamSupport.stream(consumerRecords.records(topic).spliterator(), false) + .map(r -> JsonSerdeUtil.putArrayToJsonString(r.value(), PK_NAMES_KEY, pkNames)) + .collect(Collectors.toList()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index badbb518ab10..0f5ce4f1b3f2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -41,7 +41,7 @@ public KafkaSyncTableAction( @Override protected Source buildSource() { - return KafkaActionUtils.buildKafkaSource(mqConfig); + return KafkaActionUtils.buildKafkaSource(mqConfig, primaryKeys); } @Override @@ -51,7 +51,7 @@ protected String topic() { @Override protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) { - return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic); + return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic, primaryKeys); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java new file mode 100644 index 000000000000..f73734df30bb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */ +public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase { + + @Test + @Timeout(60) + public void testSchemaEvolution() throws Exception { + runSingleTableSchemaEvolution("schemaevolution"); + } + + private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { + final String topic = "schema_evolution"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = + readLines(String.format("kafka/debezium/table/%s/debezium-data-1.txt", sourceDir)); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(topic, sourceDir); + } + + private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + + try { + writeRecordsToKafka( + topic, + readLines( + String.format( + "kafka/debezium/table/%s/debezium-data-2.txt", sourceDir))); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", + "+I[102, car battery, 12V car battery, 8.1, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"); + waitForResult(expected, table, rowType, primaryKeys); + + try { + writeRecordsToKafka( + topic, + readLines( + String.format( + "kafka/debezium/table/%s/debezium-data-3.txt", sourceDir))); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age", "address"}); + expected = + Arrays.asList( + "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", + "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(60) + public void testComputedColumn() throws Exception { + String topic = "computed_column"; + createTestTopic(topic, 1, 1); + + List lines = readLines("kafka/debezium/table/computedcolumn/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withComputedColumnArgs("_year=year(date)") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT() + }, + new String[] {"id", "date", "_year"}); + waitForResult( + Collections.singletonList("+I[101, 2023-03-23, 2023]"), + getFileStoreTable(tableName), + rowType, + Collections.singletonList("id")); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt new file mode 100644 index 000000000000..5a571d3fd383 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 101, "date": "2023-03-23"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt new file mode 100644 index 000000000000..b3ff4e23a3fb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt new file mode 100644 index 000000000000..528f96522314 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 103, "name": "12-pack drill bits", "description": "12-pack of drill bits with sizes ranging from #40 to #3", "weight": 0.8, "age": 18}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"before": null, "after": {"id": 104, "name": "hammer", "description": "12oz carpenter's hammer", "weight": 0.75, "age": 24}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt new file mode 100644 index 000000000000..79210f88d9a8 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 105, "name": "hammer", "description": "14oz carpenter's hammer", "weight": 0.875, "address": "Shanghai"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"before": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "after": null, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "d", "ts_ms": 1596684883000, "transaction": null} +{"before": {"address": "Shanghai"}, "after": {"id": 105, "name": "hammer", "description": "14oz carpenter's hammer", "weight": 0.875, "address": "Beijing"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684906000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "u", "ts_ms": 1596684906000, "transaction": null} +{"before": null, "after": {"id": 107, "name": "rocks", "description": "box of assorted rocks", "weight": 5.3}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}