From a5570d837d847c6303441d14c3e8fbb747b7093c Mon Sep 17 00:00:00 2001 From: HZY <48040205+Alibaba-HZY@users.noreply.github.com> Date: Tue, 20 Jun 2023 15:57:25 +0800 Subject: [PATCH] [flink] Supports Kafka with canal CDC format database ingestion (#1200) --- docs/content/how-to/cdc-ingestion.md | 75 +- .../generated/kafka_sync_database.html | 73 ++ .../flink/action/cdc/kafka/KafkaSchema.java | 155 ++-- .../cdc/kafka/KafkaSyncDatabaseAction.java | 457 ++++++++++++ .../cdc/kafka/KafkaSyncTableAction.java | 2 +- .../cdc/kafka/canal/CanalJsonEventParser.java | 102 +-- ...seBase.java => KafkaActionITCaseBase.java} | 4 +- .../KafkaCanalSyncDatabaseActionITCase.java | 672 ++++++++++++++++++ .../KafkaCanalSyncTableActionITCase.java | 2 +- .../cdc/kafka/KafkaEventParserTest.java | 57 +- .../action/cdc/kafka/KafkaSchemaTest.java | 5 +- .../database/include/topic0/canal-data-1.txt | 22 + .../prefixsuffix/topic0/canal-data-1.txt | 20 + .../prefixsuffix/topic0/canal-data-2.txt | 21 + .../prefixsuffix/topic0/canal-data-3.txt | 20 + .../prefixsuffix/topic1/canal-data-1.txt | 20 + .../prefixsuffix/topic1/canal-data-2.txt | 21 + .../prefixsuffix/topic1/canal-data-3.txt | 20 + .../schemaevolution/topic0/canal-data-1.txt | 20 + .../schemaevolution/topic0/canal-data-2.txt | 21 + .../schemaevolution/topic0/canal-data-3.txt | 20 + .../schemaevolution/topic1/canal-data-1.txt | 20 + .../schemaevolution/topic1/canal-data-2.txt | 21 + .../schemaevolution/topic1/canal-data-3.txt | 20 + .../schemaevolution/topic2/canal-data-1.txt | 19 + .../schemaevolution/topic2/canal-data-2.txt | 18 + .../schemaevolution/topic2/canal-data-3.txt | 18 + 27 files changed, 1761 insertions(+), 164 deletions(-) create mode 100644 docs/layouts/shortcodes/generated/kafka_sync_database.html create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/{KafkaCanalActionITCaseBase.java => KafkaActionITCaseBase.java} (99%) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/include/topic0/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-3.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-3.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-3.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-3.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-1.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-2.txt create mode 100644 paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-3.txt diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 9f1039c76415..3f491606700d 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -27,14 +27,15 @@ under the License. # CDC Ingestion Paimon supports a variety of ways to ingest data into Paimon tables with schema evolution. This means that the added -columns are synchronized to the Paimon table in real time and the synchronization job will not restarted for this purpose. +columns are synchronized to the Paimon table in real time and the synchronization job will not be restarted for this purpose. We currently support the following sync ways: 1. MySQL Synchronizing Table: synchronize one or multiple tables from MySQL into one Paimon table. 2. MySQL Synchronizing Database: synchronize the whole MySQL database into one Paimon database. 3. [API Synchronizing Table]({{< ref "/api/flink-api#cdc-ingestion-table" >}}): synchronize your custom DataStream input into one Paimon table. -4. Kafka Synchronizing Table: synchronize one Kafka topic's table into one Paimon table. +4. Kafka Synchronizing Table: synchronize one Kafka topic's table into one Paimon table. +5. Kafka Synchronizing Database: synchronize one Kafka topic containing multiple tables or multiple topics containing one table each into one Paimon database. ## MySQL @@ -276,6 +277,76 @@ Example --table-conf changelog-producer=input \ --table-conf sink.parallelism=4 ``` +### Synchronizing Databases + +By using [KafkaSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the multi topic or one topic into one Paimon database. + +To use this feature through `flink run`, run the following shell command. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + kafka-sync-database + --warehouse \ + --database \ + [--schema-init-max-read ] \ + [--ignore-incompatible ] \ + [--table-prefix ] \ + [--table-suffix ] \ + [--including-tables ] \ + [--excluding-tables ] \ + [--kafka-conf [--kafka-conf ...]] \ + [--catalog-conf [--catalog-conf ...]] \ + [--table-conf [--table-conf ...]] +``` + +{{< generated/kafka_sync_database >}} + +Only tables with primary keys will be synchronized. + +For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table. +Its schema will be derived from all specified Kafka topic's tables,it gets the earliest non-DDL data parsing schema from topic. If the Paimon table already exists, its schema will be compared against the schema of all specified Kafka topic's tables. + +Example + +Synchronization from one Kafka topic to Paimon database. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + kafka-sync-database \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --schema-init-max-read 500 \ + --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \ + --kafka-conf topic=order \ + --kafka-conf properties.group.id=123456 \ + --kafka-conf value.format=canal-json \ + --catalog-conf metastore=hive \ + --catalog-conf uri=thrift://hive-metastore:9083 \ + --table-conf bucket=4 \ + --table-conf changelog-producer=input \ + --table-conf sink.parallelism=4 +``` + +Synchronization from multiple Kafka topics to Paimon database. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + kafka-sync-database \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \ + --kafka-conf topic=order,logistic_order,user \ + --kafka-conf properties.group.id=123456 \ + --kafka-conf value.format=canal-json \ + --catalog-conf metastore=hive \ + --catalog-conf uri=thrift://hive-metastore:9083 \ + --table-conf bucket=4 \ + --table-conf changelog-producer=input \ + --table-conf sink.parallelism=4 +``` ## Schema Change Evolution diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html new file mode 100644 index 000000000000..226a28e7f5ae --- /dev/null +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -0,0 +1,73 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} +{{ $ref := ref . "maintenance/configurations.md" }} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ConfigurationDescription
--warehouse
The path to Paimon warehouse.
--database
The database name in Paimon catalog.
--schema-init-max-read
If your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized. The default value is 1000.
--ignore-incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.
--table-prefix
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table-prefix ods_".
--table-suffix
The suffix of all Paimon tables to be synchronized. The usage is same as "--table-prefix".
--including-tables
It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.
--excluding-tables
It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both.
--kafka-conf
The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its document for a complete list of configurations.
--catalog-conf
The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See here for a complete list of catalog configurations.
--table-conf
The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations.
\ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java index e716a9211818..3fc4015ba130 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java @@ -26,6 +26,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.commons.compress.utils.Lists; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -41,6 +42,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -48,48 +50,22 @@ public class KafkaSchema { private static final int MAX_RETRY = 100; + private static final int MAX_READ = 1000; - private final ObjectMapper objectMapper = new ObjectMapper(); - - private String databaseName; - private String tableName; + private final String databaseName; + private final String tableName; private final Map fields; private final List primaryKeys; - public KafkaSchema(Configuration kafkaConfig, String topic) throws Exception { - - fields = new LinkedHashMap<>(); - primaryKeys = new ArrayList<>(); - KafkaConsumer consumer = getKafkaEarliestConsumer(kafkaConfig); - - consumer.subscribe(Collections.singletonList(topic)); - - int retry = 0; - boolean success = false; - while (!success) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - try { - String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT); - if ("canal-json".equals(format)) { - success = parseCanalJson(record.value()); - if (success) { - break; - } - } else { - throw new UnsupportedOperationException( - "This format: " + format + " is not support."); - } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - if (!success && retry == MAX_RETRY) { - throw new Exception("Could not get metadata from server,topic :" + topic); - } - Thread.sleep(100); - retry++; - } + public KafkaSchema( + String databaseName, + String tableName, + Map fields, + List primaryKeys) { + this.databaseName = databaseName; + this.tableName = tableName; + this.fields = fields; + this.primaryKeys = primaryKeys; } public String tableName() { @@ -132,7 +108,12 @@ private static String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null ? record.get(key).asText() : null; } - private boolean parseCanalJson(String record) throws JsonProcessingException { + private static KafkaSchema parseCanalJson(String record, ObjectMapper objectMapper) + throws JsonProcessingException { + String databaseName; + String tableName; + final Map fields = new LinkedHashMap<>(); + final List primaryKeys = new ArrayList<>(); JsonNode root = objectMapper.readValue(record, JsonNode.class); if (!extractIsDDL(root)) { JsonNode mysqlType = root.get("mysqlType"); @@ -151,8 +132,102 @@ private boolean parseCanalJson(String record) throws JsonProcessingException { } databaseName = extractJsonNode(root, "database"); tableName = extractJsonNode(root, "table"); + return new KafkaSchema(databaseName, tableName, fields, primaryKeys); + } + return null; + } + + public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String topic) + throws Exception { + KafkaConsumer consumer = getKafkaEarliestConsumer(kafkaConfig); + + consumer.subscribe(Collections.singletonList(topic)); + KafkaSchema kafkaSchema; + int retry = 0; + ObjectMapper objectMapper = new ObjectMapper(); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + try { + String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT); + if ("canal-json".equals(format)) { + kafkaSchema = parseCanalJson(record.value(), objectMapper); + if (kafkaSchema != null) { + return kafkaSchema; + } + } else { + throw new UnsupportedOperationException( + "This format: " + format + " is not support."); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + if (retry == MAX_RETRY) { + throw new Exception("Could not get metadata from server,topic :" + topic); + } + Thread.sleep(100); + retry++; + } + } + + public static List getListKafkaSchema( + Configuration kafkaConfig, String topic, int maxRead) throws Exception { + KafkaConsumer consumer = getKafkaEarliestConsumer(kafkaConfig); + + consumer.subscribe(Collections.singletonList(topic)); + List kafkaSchemaList = Lists.newArrayList(); + int retry = 0; + int read = 0; + maxRead = maxRead == 0 ? MAX_READ : maxRead; + ObjectMapper objectMapper = new ObjectMapper(); + while (maxRead > read) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + read++; + try { + String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT); + if ("canal-json".equals(format)) { + KafkaSchema kafkaSchema = parseCanalJson(record.value(), objectMapper); + if (kafkaSchema != null && !kafkaSchemaList.contains(kafkaSchema)) { + kafkaSchemaList.add(kafkaSchema); + } + } else { + throw new UnsupportedOperationException( + "This format: " + format + " is not support."); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + if (kafkaSchemaList.isEmpty() && retry == MAX_RETRY) { + throw new Exception("Could not get metadata from server,topic :" + topic); + } else if (!kafkaSchemaList.isEmpty() && retry == MAX_RETRY) { + break; + } + Thread.sleep(100); + retry++; + } + return kafkaSchemaList; + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } - return false; + if (!(o instanceof KafkaSchema)) { + return false; + } + KafkaSchema that = (KafkaSchema) o; + return databaseName.equals(that.databaseName) + && tableName.equals(that.tableName) + && fields.equals(that.fields) + && primaryKeys.equals(that.primaryKeys); + } + + @Override + public int hashCode() { + return Objects.hash(databaseName, tableName, fields, primaryKeys); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java new file mode 100644 index 000000000000..a4e48b12e0dc --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.action.Action; +import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.cdc.TableNameConverter; +import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser; +import org.apache.paimon.flink.sink.cdc.EventParser; +import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; +import org.apache.flink.util.CollectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.paimon.flink.action.Action.optionalConfigMap; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * An {@link Action} which synchronize the Multiple topics into one Paimon database. + * + *

You should specify Kafka source topic in {@code kafkaConfig}. See document + * of flink-connectors for detailed keys and values. + * + *

For each topic's table to be synchronized, if the corresponding Paimon table does not exist, + * this action will automatically create the table. Its schema will be derived from all specified + * tables. If the Paimon table already exists, its schema will be compared against the schema of all + * specified tables. + * + *

This action supports a limited number of schema changes. Currently, the framework can not drop + * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently + * supported schema changes includes: + * + *

    + *
  • Adding columns. + *
  • Altering column types. More specifically, + *
      + *
    • altering from a string type (char, varchar, text) to another string type with longer + * length, + *
    • altering from a binary type (binary, varbinary, blob) to another binary type with + * longer length, + *
    • altering from an integer type (tinyint, smallint, int, bigint) to another integer + * type with wider range, + *
    • altering from a floating-point type (float, double) to another floating-point type + * with wider range, + *
    + * are supported. + *
+ * + *

This action creates a Paimon table sink for each Paimon table to be written, so this action is + * not very efficient in resource saving. We may optimize this action by merging all sinks into one + * instance in the future. + */ +public class KafkaSyncDatabaseAction extends ActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSyncDatabaseAction.class); + + private final Configuration kafkaConfig; + private final String database; + private final int schemaInitMaxRead; + private final boolean ignoreIncompatible; + private final String tablePrefix; + private final String tableSuffix; + @Nullable private final Pattern includingPattern; + @Nullable private final Pattern excludingPattern; + private final Map tableConfig; + + KafkaSyncDatabaseAction( + Map kafkaConfig, + String warehouse, + String database, + boolean ignoreIncompatible, + Map catalogConfig, + Map tableConfig) { + this( + kafkaConfig, + warehouse, + database, + 0, + ignoreIncompatible, + null, + null, + null, + null, + catalogConfig, + tableConfig); + } + + KafkaSyncDatabaseAction( + Map kafkaConfig, + String warehouse, + String database, + int schemaInitMaxRead, + boolean ignoreIncompatible, + @Nullable String tablePrefix, + @Nullable String tableSuffix, + @Nullable String includingTables, + @Nullable String excludingTables, + Map catalogConfig, + Map tableConfig) { + super(warehouse, catalogConfig); + this.kafkaConfig = Configuration.fromMap(kafkaConfig); + this.database = database; + this.schemaInitMaxRead = schemaInitMaxRead; + this.ignoreIncompatible = ignoreIncompatible; + this.tablePrefix = tablePrefix == null ? "" : tablePrefix; + this.tableSuffix = tableSuffix == null ? "" : tableSuffix; + this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); + this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + this.tableConfig = tableConfig; + } + + public void build(StreamExecutionEnvironment env) throws Exception { + checkArgument( + kafkaConfig.contains(KafkaConnectorOptions.VALUE_FORMAT), + KafkaConnectorOptions.VALUE_FORMAT.key() + " cannot be null."); + checkArgument( + !CollectionUtil.isNullOrEmpty(kafkaConfig.get(KafkaConnectorOptions.TOPIC)), + KafkaConnectorOptions.TOPIC.key() + " cannot be null."); + + boolean caseSensitive = catalog.caseSensitive(); + + if (!caseSensitive) { + validateCaseInsensitive(); + } + + Map> kafkaCanalSchemaMap = getKafkaCanalSchemaMap(); + + catalog.createDatabase(database, true); + TableNameConverter tableNameConverter = + new TableNameConverter(caseSensitive, tablePrefix, tableSuffix); + + List fileStoreTables = new ArrayList<>(); + List monitoredTopics = new ArrayList<>(); + for (Map.Entry> kafkaCanalSchemaEntry : + kafkaCanalSchemaMap.entrySet()) { + List kafkaSchemaList = kafkaCanalSchemaEntry.getValue(); + String topic = kafkaCanalSchemaEntry.getKey(); + for (KafkaSchema kafkaSchema : kafkaSchemaList) { + String paimonTableName = tableNameConverter.convert(kafkaSchema.tableName()); + Identifier identifier = new Identifier(database, paimonTableName); + FileStoreTable table; + Schema fromCanal = + KafkaActionUtils.buildPaimonSchema( + kafkaSchema, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + tableConfig, + caseSensitive); + try { + table = (FileStoreTable) catalog.getTable(identifier); + Supplier errMsg = + incompatibleMessage(table.schema(), kafkaSchema, identifier); + if (shouldMonitorTable(table.schema(), fromCanal, errMsg)) { + monitoredTopics.add(topic); + fileStoreTables.add(table); + } + } catch (Catalog.TableNotExistException e) { + catalog.createTable(identifier, fromCanal, false); + table = (FileStoreTable) catalog.getTable(identifier); + monitoredTopics.add(topic); + fileStoreTables.add(table); + } + } + } + monitoredTopics = monitoredTopics.stream().distinct().collect(Collectors.toList()); + Preconditions.checkState( + !fileStoreTables.isEmpty(), + "No tables to be synchronized. Possible cause is the schemas of all tables in specified " + + "Kafka topic's table are not compatible with those of existed Paimon tables. Please check the log."); + + kafkaConfig.set(KafkaConnectorOptions.TOPIC, monitoredTopics); + KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig); + + EventParser.Factory parserFactory; + String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT); + if ("canal-json".equals(format)) { + parserFactory = () -> new CanalJsonEventParser(caseSensitive, tableNameConverter); + } else { + throw new UnsupportedOperationException("This format: " + format + " is not support."); + } + FlinkCdcSyncDatabaseSinkBuilder sinkBuilder = + new FlinkCdcSyncDatabaseSinkBuilder() + .withInput( + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "Kafka Source")) + .withParserFactory(parserFactory) + .withTables(fileStoreTables); + String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); + if (sinkParallelism != null) { + sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); + } + sinkBuilder.build(); + } + + private void validateCaseInsensitive() { + checkArgument( + database.equals(database.toLowerCase()), + String.format( + "Database name [%s] cannot contain upper case in case-insensitive catalog.", + database)); + checkArgument( + tablePrefix.equals(tablePrefix.toLowerCase()), + String.format( + "Table prefix [%s] cannot contain upper case in case-insensitive catalog.", + tablePrefix)); + checkArgument( + tableSuffix.equals(tableSuffix.toLowerCase()), + String.format( + "Table suffix [%s] cannot contain upper case in case-insensitive catalog.", + tableSuffix)); + } + + private Map> getKafkaCanalSchemaMap() throws Exception { + Map> kafkaCanalSchemaMap = new HashMap<>(); + List topicList = kafkaConfig.get(KafkaConnectorOptions.TOPIC); + if (topicList.size() > 1) { + topicList.forEach( + topic -> { + try { + KafkaSchema kafkaSchema = + KafkaSchema.getKafkaSchema(kafkaConfig, topic); + if (shouldMonitorTable(kafkaSchema.tableName())) { + kafkaCanalSchemaMap.put( + topic, Collections.singletonList(kafkaSchema)); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + List kafkaSchemaList = + KafkaSchema.getListKafkaSchema( + kafkaConfig, topicList.get(0), schemaInitMaxRead); + kafkaSchemaList = + kafkaSchemaList.stream() + .filter(kafkaSchema -> shouldMonitorTable(kafkaSchema.tableName())) + .collect(Collectors.toList()); + kafkaCanalSchemaMap.put(topicList.get(0), kafkaSchemaList); + } + + return kafkaCanalSchemaMap; + } + + private boolean shouldMonitorTable(String mySqlTableName) { + boolean shouldMonitor = true; + if (includingPattern != null) { + shouldMonitor = includingPattern.matcher(mySqlTableName).matches(); + } + if (excludingPattern != null) { + shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches(); + } + LOG.debug("Source table {} is monitored? {}", mySqlTableName, shouldMonitor); + return shouldMonitor; + } + + private boolean shouldMonitorTable( + TableSchema tableSchema, Schema schema, Supplier errMsg) { + if (KafkaActionUtils.schemaCompatible(tableSchema, schema)) { + return true; + } else if (ignoreIncompatible) { + LOG.warn(errMsg.get() + "This table will be ignored."); + return false; + } else { + throw new IllegalArgumentException( + errMsg.get() + + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true."); + } + } + + private Supplier incompatibleMessage( + TableSchema paimonSchema, KafkaSchema kafkaSchema, Identifier identifier) { + return () -> + String.format( + "Incompatible schema found.\n" + + "Paimon table is: %s, fields are: %s.\n" + + "Kafka's table is: %s.%s, fields are: %s.\n", + identifier.getFullName(), + paimonSchema.fields(), + kafkaSchema.databaseName(), + kafkaSchema.tableName(), + kafkaSchema.fields()); + } + + // ------------------------------------------------------------------------ + // Flink run methods + // ------------------------------------------------------------------------ + + public static Optional create(String[] args) { + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + + if (params.has("help")) { + printHelp(); + return Optional.empty(); + } + + String warehouse = params.get("warehouse"); + String database = params.get("database"); + int schemaInitMaxRead = Integer.parseInt(params.get("schema-init-max-read")); + boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible")); + String tablePrefix = params.get("table-prefix"); + String tableSuffix = params.get("table-suffix"); + String includingTables = params.get("including-tables"); + String excludingTables = params.get("excluding-tables"); + + Map kafkaConfigOption = optionalConfigMap(params, "kafka-conf"); + Map catalogConfigOption = optionalConfigMap(params, "catalog-conf"); + Map tableConfigOption = optionalConfigMap(params, "table-conf"); + return Optional.of( + new KafkaSyncDatabaseAction( + kafkaConfigOption, + warehouse, + database, + schemaInitMaxRead, + ignoreIncompatible, + tablePrefix, + tableSuffix, + includingTables, + excludingTables, + catalogConfigOption, + tableConfigOption)); + } + + private static void printHelp() { + System.out.println( + "Action \"kafka-sync-database\" creates a streaming job " + + "with a Flink Kafka source and multiple Paimon table sinks " + + "to synchronize multiple tables into one Paimon database.\n" + + "Only tables with primary keys will be considered. "); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " kafka-sync-database --warehouse --database " + + "[--schema-init-max-read ] " + + "[--ignore-incompatible ] " + + "[--table-prefix ] " + + "[--table-suffix ] " + + "[--including-tables ] " + + "[--excluding-tables ] " + + "[--kafka-conf [--kafka-conf ...]] " + + "[--catalog-conf [--catalog-conf ...]] " + + "[--table-conf [--table-conf ...]]"); + System.out.println(); + + System.out.println( + "--schema-init-max-read is default 1000, if your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized."); + System.out.println(); + + System.out.println( + "--ignore-incompatible is default false, in this case, if Topic's table name exists in Paimon " + + "and their schema is incompatible, an exception will be thrown. " + + "You can specify it to true explicitly to ignore the incompatible tables and exception."); + System.out.println(); + + System.out.println( + "--table-prefix is the prefix of all Paimon tables to be synchronized. For example, if you want all " + + "synchronized tables to have \"ods_\" as prefix, you can specify `--table-prefix ods_`."); + System.out.println("The usage of --table-suffix is same as `--table-prefix`"); + System.out.println(); + + System.out.println( + "--including-tables is used to specify which source tables are to be synchronized. " + + "You must use '|' to separate multiple tables. Regular expression is supported."); + System.out.println( + "--excluding-tables is used to specify which source tables are not to be synchronized. " + + "The usage is same as --including-tables."); + System.out.println( + "--excluding-tables has higher priority than --including-tables if you specified both."); + System.out.println(); + + System.out.println("kafka source conf syntax:"); + System.out.println(" key=value"); + System.out.println( + "'topic', 'properties.bootstrap.servers', 'properties.group.id'" + + "are required configurations, others are optional."); + System.out.println( + "For a complete list of supported configurations, " + + "see https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/table/kafka/"); + System.out.println(); + System.out.println(); + + System.out.println("Paimon catalog and table sink conf syntax:"); + System.out.println(" key=value"); + System.out.println("All Paimon sink table will be applied the same set of configurations."); + System.out.println( + "For a complete list of supported configurations, " + + "see https://paimon.apache.org/docs/master/maintenance/configurations/"); + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " kafka-sync-database \\\n" + + " --warehouse hdfs:///path/to/warehouse \\\n" + + " --database test_db \\\n" + + " --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \\\n" + + " --kafka-conf topic=order,logistic,user \\\n" + + " --kafka-conf properties.group.id=123456 \\\n" + + " --kafka-conf value.format=canal-json \\\n" + + " --catalog-conf metastore=hive \\\n" + + " --catalog-conf uri=thrift://hive-metastore:9083 \\\n" + + " --table-conf bucket=4 \\\n" + + " --table-conf changelog-producer=input \\\n" + + " --table-conf sink.parallelism=4"); + } + + @Override + public void run() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + build(env); + env.execute(String.format("KAFKA-Paimon Database Sync: %s", database)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index d16270774cfe..ec16e294e3ad 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -140,7 +140,7 @@ public void build(StreamExecutionEnvironment env) throws Exception { KafkaConnectorOptions.VALUE_FORMAT.key() + " cannot be null "); KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig); String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0); - KafkaSchema kafkaSchema = new KafkaSchema(kafkaConfig, topic); + KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, topic); catalog.createDatabase(database, true); boolean caseSensitive = catalog.caseSensitive(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java index df830740c7a4..7d68597b901d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java @@ -37,7 +37,6 @@ import com.alibaba.druid.DbType; import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLName; import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn; import com.alibaba.druid.sql.ast.statement.SQLAlterTableDropColumnItem; @@ -52,7 +51,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -65,6 +63,7 @@ public class CanalJsonEventParser implements EventParser { private static final String FIELD_DATA = "data"; private static final String FIELD_OLD = "old"; private static final String TYPE = "type"; + private static final String MYSQL_TYPE = "mysqlType"; private static final String OP_INSERT = "INSERT"; private static final String OP_UPDATE = "UPDATE"; private static final String OP_DELETE = "DELETE"; @@ -72,8 +71,6 @@ public class CanalJsonEventParser implements EventParser { private final ObjectMapper objectMapper = new ObjectMapper(); private JsonNode root; - private Map mySqlFieldTypes; - private Map paimonFieldTypes; private final boolean caseSensitive; private final TableNameConverter tableNameConverter; @@ -100,15 +97,7 @@ public CanalJsonEventParser( public void setRawEvent(String rawEvent) { try { root = objectMapper.readValue(rawEvent, JsonNode.class); - Preconditions.checkNotNull( - root.get("mysqlType"), - "CanalJsonEventParser only supports canal-json format," - + "please make sure that your topic's format is accurate."); - JsonNode mysqlType = root.get("mysqlType"); - if (!isSchemaChange()) { - updateFieldTypes(mysqlType); - } } catch (Exception e) { throw new RuntimeException(e); } @@ -120,18 +109,6 @@ public String parseTableName() { return tableNameConverter.convert(tableName); } - private void updateFieldTypes(JsonNode schema) { - mySqlFieldTypes = new LinkedHashMap<>(); - paimonFieldTypes = new LinkedHashMap<>(); - Iterator iterator = schema.fieldNames(); - while (iterator.hasNext()) { - String fieldName = iterator.next(); - String fieldType = schema.get(fieldName).asText(); - mySqlFieldTypes.put(fieldName, fieldType); - paimonFieldTypes.put(fieldName, MySqlTypeUtils.toDataType(fieldType)); - } - } - private boolean isSchemaChange() { if (root.get("isDdl") == null) { return false; @@ -151,16 +128,8 @@ public List parseSchemaChange() { if (StringUtils.isEmpty(sql)) { return Collections.emptyList(); } - List result = new ArrayList<>(); int id = 0; - for (Map.Entry fieldType : paimonFieldTypes.entrySet()) { - result.add( - new DataField( - id++, - caseSensitive ? fieldType.getKey() : fieldType.getKey().toLowerCase(), - fieldType.getValue())); - } SQLStatement sqlStatement = SQLUtils.parseSingleStatement(sql, DbType.mysql); if (sqlStatement instanceof SQLAlterTableStatement) { SQLAlterTableStatement sqlAlterTableStatement = (SQLAlterTableStatement) sqlStatement; @@ -185,26 +154,9 @@ public List parseSchemaChange() { id++, caseSensitive ? columnName : columnName.toLowerCase(), dataType)); - mySqlFieldTypes.put(columnName, columnType); - paimonFieldTypes.put(columnName, MySqlTypeUtils.toDataType(columnType)); } } else if (sqlAlterTableItem instanceof SQLAlterTableDropColumnItem) { - SQLAlterTableDropColumnItem sqlAlterTableDropColumnItem = - (SQLAlterTableDropColumnItem) sqlAlterTableItem; - List columnNames = - sqlAlterTableDropColumnItem.getColumns().stream() - .map(SQLName::getSimpleName) - .collect(Collectors.toList()); - columnNames.forEach( - columnName -> { - mySqlFieldTypes.remove(columnName); - paimonFieldTypes.remove(columnName); - }); - - result = - result.stream() - .filter(dataField -> !columnNames.contains(dataField.name())) - .collect(Collectors.toList()); + // ignore } else if (sqlAlterTableItem instanceof MySqlAlterTableModifyColumn) { MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn = (MySqlAlterTableModifyColumn) sqlAlterTableItem; @@ -216,26 +168,12 @@ public List parseSchemaChange() { boolean notNull = newColumnDefinition.toString().toUpperCase().contains("NOT NULL"); dataType = notNull ? dataType.notNull() : dataType.nullable(); - DataType finalDataType = dataType; - result = - result.stream() - .map( - dataField -> { - if (dataField.name().equals(columnName)) { - dataField = - new DataField( - dataField.id(), - caseSensitive - ? columnName - : columnName - .toLowerCase(), - finalDataType); - } - return dataField; - }) - .collect(Collectors.toList()); - mySqlFieldTypes.put(columnName, columnType); - paimonFieldTypes.put(columnName, MySqlTypeUtils.toDataType(columnType)); + result.add( + new DataField( + id++, + caseSensitive ? columnName : columnName.toLowerCase(), + dataType)); + } else if (sqlAlterTableItem instanceof MySqlAlterTableChangeColumn) { MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn = (MySqlAlterTableChangeColumn) sqlAlterTableItem; @@ -258,13 +196,9 @@ public List parseSchemaChange() { .collect(Collectors.toList()); result.add( new DataField( - id, + id++, caseSensitive ? columnName : columnName.toLowerCase(), dataType)); - mySqlFieldTypes.remove(oldColumnName); - paimonFieldTypes.remove(oldColumnName); - mySqlFieldTypes.put(columnName, columnType); - paimonFieldTypes.put(columnName, MySqlTypeUtils.toDataType(columnType)); } } } @@ -277,7 +211,10 @@ public List parseRecords() { if (isSchemaChange()) { return Collections.emptyList(); } - + Preconditions.checkNotNull( + root.get(TYPE), + "CanalJsonEventParser only supports canal-json format," + + "please make sure that your topic's format is accurate."); List records = new ArrayList<>(); String type = root.get(TYPE).asText(); if (OP_UPDATE.equals(type)) { @@ -324,6 +261,19 @@ public List parseRecords() { } private Map extractRow(JsonNode recordRow) { + Map mySqlFieldTypes = new HashMap<>(); + Preconditions.checkNotNull( + root.get(MYSQL_TYPE), + "CanalJsonEventParser only supports canal-json format," + + "please make sure that your topic's format is accurate."); + JsonNode schema = root.get(MYSQL_TYPE); + Iterator iterator = schema.fieldNames(); + while (iterator.hasNext()) { + String fieldName = iterator.next(); + String fieldType = schema.get(fieldName).asText(); + mySqlFieldTypes.put(fieldName, fieldType); + } + Map jsonMap = objectMapper.convertValue(recordRow, new TypeReference>() {}); if (jsonMap == null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalActionITCaseBase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index de3003b7825e..580240773803 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -79,11 +79,11 @@ import static org.assertj.core.api.Assertions.assertThat; /** Base test class for {@link org.apache.paimon.flink.action.Action}s related to MySQL. */ -public class KafkaCanalActionITCaseBase extends ActionITCaseBase { +public abstract class KafkaActionITCaseBase extends ActionITCaseBase { private final ObjectMapper objectMapper = new ObjectMapper(); - private static final Logger LOG = LoggerFactory.getLogger(KafkaCanalActionITCaseBase.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaActionITCaseBase.class); private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; private static final Network NETWORK = Network.newNetwork(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java new file mode 100644 index 000000000000..5add09619f03 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** IT cases for {@link KafkaSyncDatabaseAction}. */ +public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase { + + @Test + @Timeout(60) + public void testSchemaEvolutionMultiTopic() throws Exception { + + final String topic1 = "schema_evolution_0"; + final String topic2 = "schema_evolution_1"; + final String topic3 = "schema_evolution_2"; + boolean writeOne = false; + int fileCount = 3; + List topics = Arrays.asList(topic1, topic2, topic3); + topics.forEach( + topic -> { + createTestTopic(topic, 1, 1); + }); + + // ---------- Write the Canal json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(i), + readLines( + "kafka.canal/database/schemaevolution/topic" + + i + + "/canal-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", String.join(";", topics)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + false, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + testSchemaEvolutionImpl(topics, writeOne, fileCount); + } + + @Test + // @Timeout(60) + public void testSchemaEvolutionOneTopic() throws Exception { + + final String topic = "schema_evolution"; + boolean writeOne = true; + int fileCount = 3; + List topics = Collections.singletonList(topic); + topics.forEach( + t -> { + createTestTopic(t, 1, 1); + }); + + // ---------- Write the Canal json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(0), + readLines( + "kafka.canal/database/schemaevolution/topic" + + i + + "/canal-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", String.join(";", topics)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + false, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + testSchemaEvolutionImpl(topics, writeOne, fileCount); + } + + private void testSchemaEvolutionImpl(List topics, boolean writeOne, int fileCount) + throws Exception { + FileStoreTable table1 = getFileStoreTable("t1"); + FileStoreTable table2 = getFileStoreTable("t2"); + + RowType rowType1 = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + List primaryKeys1 = Collections.singletonList("k"); + List expected = Arrays.asList("+I[1, one]", "+I[3, three]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + RowType rowType2 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10).notNull(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"k1", "k2", "v1", "v2"}); + List primaryKeys2 = Arrays.asList("k1", "k2"); + expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka.canal/database/schemaevolution/topic" + + i + + "/canal-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + rowType1 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT() + }, + new String[] {"k", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, one, NULL]", + "+I[3, three, NULL]", + "+I[5, five, 50]", + "+I[7, seven, 70]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10).notNull(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.VARCHAR(10) + }, + new String[] {"k1", "k2", "v1", "v2", "v3"}); + expected = + Arrays.asList( + "+I[2, two, 20, 200, NULL]", + "+I[4, four, 40, 400, NULL]", + "+I[6, six, 60, 600, string_6]", + "+I[8, eight, 80, 800, string_8]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka.canal/database/schemaevolution/topic" + + i + + "/canal-data-3.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + rowType1 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT() + }, + new String[] {"k", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, one, NULL]", + "+I[3, three, NULL]", + "+I[5, five, 50]", + "+I[7, seven, 70]", + "+I[9, nine, 9000000000000]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10).notNull(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.VARCHAR(20) + }, + new String[] {"k1", "k2", "v1", "v2", "v3"}); + expected = + Arrays.asList( + "+I[2, two, 20, 200, NULL]", + "+I[4, four, 40, 400, NULL]", + "+I[6, six, 60, 600, string_6]", + "+I[8, eight, 80, 800, string_8]", + "+I[10, ten, 100, 1000, long_long_string_10]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + } + + @Test + public void testTopicIsEmpty() { + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + false, + Collections.emptyMap(), + Collections.emptyMap()); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> action.build(env), + "Expecting IllegalArgumentException"); + assertThat(e).hasMessage("topic cannot be null."); + } + + @Test + @Timeout(60) + public void testTableAffixMultiTopic() throws Exception { + // create table t1 + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + catalog.createDatabase(database, true); + Identifier identifier = Identifier.create(database, "test_prefix_t1_test_suffix"); + Schema schema = + Schema.newBuilder() + .column("k1", DataTypes.INT().notNull()) + .column("v0", DataTypes.VARCHAR(10)) + .primaryKey("k1") + .build(); + catalog.createTable(identifier, schema, false); + + final String topic1 = "prefix_suffix_0"; + final String topic2 = "prefix_suffix_1"; + boolean writeOne = false; + int fileCount = 2; + List topics = Arrays.asList(topic1, topic2); + topics.forEach( + topic -> { + createTestTopic(topic, 1, 1); + }); + + // ---------- Write the Canal json into Kafka ------------------- + + for (int i = 0; i < topics.size(); i++) { + try { + writeRecordsToKafka( + topics.get(i), + readLines( + "kafka.canal/database/prefixsuffix/topic" + + i + + "/canal-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", String.join(";", topics)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + 0, + false, + "test_prefix_", + "_test_suffix", + null, + null, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + testTableAffixImpl(topics, writeOne, fileCount); + } + + @Test + @Timeout(60) + public void testTableAffixOneTopic() throws Exception { + // create table t1 + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + catalog.createDatabase(database, true); + Identifier identifier = Identifier.create(database, "test_prefix_t1_test_suffix"); + Schema schema = + Schema.newBuilder() + .column("k1", DataTypes.INT().notNull()) + .column("v0", DataTypes.VARCHAR(10)) + .primaryKey("k1") + .build(); + catalog.createTable(identifier, schema, false); + + final String topic1 = "prefix_suffix"; + List topics = Collections.singletonList(topic1); + boolean writeOne = true; + int fileCount = 2; + topics.forEach( + topic -> { + createTestTopic(topic, 1, 1); + }); + + // ---------- Write the Canal json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(0), + readLines( + "kafka.canal/database/prefixsuffix/topic" + + i + + "/canal-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", String.join(";", topics)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + 0, + false, + "test_prefix_", + "_test_suffix", + null, + null, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + testTableAffixImpl(topics, writeOne, fileCount); + } + + private void testTableAffixImpl(List topics, boolean writeOne, int fileCount) + throws Exception { + FileStoreTable table1 = getFileStoreTable("test_prefix_t1_test_suffix"); + FileStoreTable table2 = getFileStoreTable("test_prefix_t2_test_suffix"); + + RowType rowType1 = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k1", "v0"}); + List primaryKeys1 = Collections.singletonList("k1"); + List expected = Arrays.asList("+I[1, one]", "+I[3, three]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + RowType rowType2 = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k2", "v0"}); + List primaryKeys2 = Collections.singletonList("k2"); + expected = Arrays.asList("+I[2, two]", "+I[4, four]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka.canal/database/prefixsuffix/topic" + + i + + "/canal-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + rowType1 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT() + }, + new String[] {"k1", "v0", "v1"}); + expected = + Arrays.asList( + "+I[1, one, NULL]", + "+I[3, three, NULL]", + "+I[5, five, 50]", + "+I[7, seven, 70]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10) + }, + new String[] {"k2", "v0", "v1"}); + expected = + Arrays.asList( + "+I[2, two, NULL]", + "+I[4, four, NULL]", + "+I[6, six, s_6]", + "+I[8, eight, s_8]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka.canal/database/prefixsuffix/topic" + + i + + "/canal-data-3.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + } + + rowType1 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT() + }, + new String[] {"k1", "v0", "v1"}); + expected = + Arrays.asList( + "+I[1, one, NULL]", + "+I[3, three, NULL]", + "+I[5, five, 50]", + "+I[7, seven, 70]", + "+I[9, nine, 9000000000000]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(20) + }, + new String[] {"k2", "v0", "v1"}); + expected = + Arrays.asList( + "+I[2, two, NULL]", + "+I[4, four, NULL]", + "+I[6, six, s_6]", + "+I[8, eight, s_8]", + "+I[10, ten, long_s_10]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + } + + @Test + @Timeout(60) + public void testIncludingTables() throws Exception { + includingAndExcludingTablesImpl( + "flink|paimon.+", + null, + Arrays.asList("flink", "paimon_1", "paimon_2"), + Collections.singletonList("ignore")); + } + + @Test + @Timeout(60) + public void testExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + null, + "flink|paimon.+", + Collections.singletonList("ignore"), + Arrays.asList("flink", "paimon_1", "paimon_2")); + } + + @Test + @Timeout(60) + public void testIncludingAndExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + "flink|paimon.+", + "paimon_1", + Arrays.asList("flink", "paimon_2"), + Arrays.asList("paimon_1", "ignore")); + } + + private void includingAndExcludingTablesImpl( + @Nullable String includingTables, + @Nullable String excludingTables, + List existedTables, + List notExistedTables) + throws Exception { + final String topic1 = "include_exclude" + UUID.randomUUID(); + List topics = Collections.singletonList(topic1); + topics.forEach( + topic -> { + createTestTopic(topic, 1, 1); + }); + + // ---------- Write the Canal json into Kafka ------------------- + + try { + writeRecordsToKafka( + topics.get(0), + readLines("kafka.canal/database/include/topic0/canal-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", String.join(";", topics)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map tableConfig = new HashMap<>(); + tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1)); + tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1)); + KafkaSyncDatabaseAction action = + new KafkaSyncDatabaseAction( + kafkaConfig, + warehouse, + database, + 0, + false, + null, + null, + includingTables, + excludingTables, + Collections.emptyMap(), + tableConfig); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + // check paimon tables + assertTableExists(existedTables); + assertTableNotExists(notExistedTables); + } + + private FileStoreTable getFileStoreTable(String tableName) throws Exception { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + Identifier identifier = Identifier.create(database, tableName); + return (FileStoreTable) catalog.getTable(identifier); + } + + private void assertTableExists(List tableNames) { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + for (String tableName : tableNames) { + Identifier identifier = Identifier.create(database, tableName); + assertThat(catalog.tableExists(identifier)).isTrue(); + } + } + + private void assertTableNotExists(List tableNames) { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + for (String tableName : tableNames) { + Identifier identifier = Identifier.create(database, tableName); + assertThat(catalog.tableExists(identifier)).isFalse(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index fe43458a0a6b..fd7569acfb7c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -52,7 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; /** IT cases for {@link KafkaCanalSyncTableActionITCase}. */ -public class KafkaCanalSyncTableActionITCase extends KafkaCanalActionITCaseBase { +public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase { @Test @Timeout(60) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java index ca84ecfd1f39..168f92641a76 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java @@ -146,55 +146,34 @@ public void testCanalJsonEventParserDdl() { new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive)); parser.setRawEvent(CANAL_JSON_EVENT); List expectDataFields = new ArrayList<>(); - expectDataFields.add(new DataField(0, "pt", DataTypes.INT())); - expectDataFields.add(new DataField(1, "_id", DataTypes.INT())); - expectDataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10))); - expectDataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING())); - expectDataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING()))); - expectDataFields.add(new DataField(5, "_enum", DataTypes.STRING())); - expectDataFields.add(new DataField(6, "v2", DataTypes.INT())); + expectDataFields.add(new DataField(0, "v2", DataTypes.INT())); assertThat(parser.parseSchemaChange()).isEmpty(); parser.setRawEvent(CANAL_JSON_DDL_ADD_EVENT); List updatedDataFieldsAdd = parser.parseSchemaChange(); assertThat(updatedDataFieldsAdd).isEqualTo(expectDataFields); - expectDataFields.remove(2); - expectDataFields.add(2, new DataField(2, "v1", DataTypes.VARCHAR(20))); + expectDataFields.clear(); + expectDataFields.add(new DataField(0, "v1", DataTypes.VARCHAR(20))); parser.setRawEvent(CANAL_JSON_DDL_MODIFY_EVENT); List updatedDataFieldsModify = parser.parseSchemaChange(); assertThat(updatedDataFieldsModify).isEqualTo(expectDataFields); - expectDataFields.remove(2); - expectDataFields.add(2, new DataField(2, "v1", DataTypes.VARCHAR(30))); - expectDataFields.remove(6); - expectDataFields.add(new DataField(6, "v2", DataTypes.BIGINT())); + expectDataFields.clear(); + + expectDataFields.add(new DataField(0, "v4", DataTypes.INT())); + expectDataFields.add(new DataField(1, "v1", DataTypes.VARCHAR(30))); + + expectDataFields.add(new DataField(2, "v5", DataTypes.DOUBLE())); + expectDataFields.add(new DataField(3, "v6", DataTypes.DECIMAL(5, 3))); + expectDataFields.add(new DataField(4, "$% ^,& *(", DataTypes.VARCHAR(10))); + expectDataFields.add(new DataField(5, "v2", DataTypes.BIGINT())); - expectDataFields.add(new DataField(7, "v4", DataTypes.INT())); - expectDataFields.add(new DataField(8, "v5", DataTypes.DOUBLE())); - expectDataFields.add(new DataField(9, "v6", DataTypes.DECIMAL(5, 3))); - expectDataFields.add(new DataField(10, "$% ^,& *(", DataTypes.VARCHAR(10))); parser.setRawEvent(CANAL_JSON_DDL_MULTI_ADD_EVENT); List updatedDataFieldsMulti = parser.parseSchemaChange(); assertThat(updatedDataFieldsMulti).isEqualTo(expectDataFields); - parser.setRawEvent(CANAL_JSON_DDL_DROP_EVENT); - List updatedDataFieldsDrop = parser.parseSchemaChange(); - for (int i = 0; i < expectDataFields.size(); i++) { - expectDataFields.set( - i, - new DataField( - i, expectDataFields.get(i).name(), expectDataFields.get(i).type())); - } - expectDataFields.remove(2); - assertThat(updatedDataFieldsDrop).isEqualTo(expectDataFields); + expectDataFields.clear(); parser.setRawEvent(CANAL_JSON_DDL_CHANGE_EVENT); List updatedDataFieldsChange = parser.parseSchemaChange(); - expectDataFields.remove(9); - for (int i = 0; i < expectDataFields.size(); i++) { - expectDataFields.set( - i, - new DataField( - i, expectDataFields.get(i).name(), expectDataFields.get(i).type())); - } - expectDataFields.add(new DataField(10, "cg", DataTypes.VARCHAR(20))); + expectDataFields.add(new DataField(0, "cg", DataTypes.VARCHAR(20))); assertThat(updatedDataFieldsChange).isEqualTo(expectDataFields); } @@ -203,15 +182,13 @@ public void testCanalJsonEventParserParseDebeziumJson() { boolean caseSensitive = true; EventParser parser = new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive)); - + parser.setRawEvent(DEBEZIUM_JSON_EVENT); RuntimeException e = assertThrows( - RuntimeException.class, - () -> parser.setRawEvent(DEBEZIUM_JSON_EVENT), - "Expecting RuntimeException"); + RuntimeException.class, parser::parseRecords, "Expecting RuntimeException"); assertThat(e) .hasMessage( - "java.lang.NullPointerException: CanalJsonEventParser only supports canal-json format,please make sure that your topic's format is accurate."); + "CanalJsonEventParser only supports canal-json format,please make sure that your topic's format is accurate."); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaTest.java index 9f236af5a080..aa4236aa1114 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaTest.java @@ -32,7 +32,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link KafkaSchema}. */ -public class KafkaSchemaTest extends KafkaCanalActionITCaseBase { +public class KafkaSchemaTest extends KafkaActionITCaseBase { @Test @Timeout(60) public void testKafkaSchema() throws Exception { @@ -49,7 +49,8 @@ public void testKafkaSchema() throws Exception { kafkaConfig.put("value.format", "canal-json"); kafkaConfig.put("topic", topic); - KafkaSchema kafkaSchema = new KafkaSchema(Configuration.fromMap(kafkaConfig), topic); + KafkaSchema kafkaSchema = + KafkaSchema.getKafkaSchema(Configuration.fromMap(kafkaConfig), topic); Map fields = new LinkedHashMap<>(); fields.put("pt", DataTypes.INT()); fields.put("_id", DataTypes.INT()); diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/include/topic0/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/include/topic0/canal-data-1.txt new file mode 100644 index 000000000000..67520fb36086 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/include/topic0/canal-data-1.txt @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k1":"1","v0":"one"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":76,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"paimon_1","ts":1684770066165,"type":"INSERT"} +{"data":[{"k1":"3","v0":"three"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":78,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"paimon_2","ts":1684770066821,"type":"INSERT"} +{"data":[{"k1":"3","v0":"three"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":78,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"ignore","ts":1684770066821,"type":"INSERT"} +{"data":[{"k1":"3","v0":"three"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":78,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"flink","ts":1684770066821,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-1.txt new file mode 100644 index 000000000000..c08b80c0c88a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k1":"1","v0":"one"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":76,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"t1","ts":1684770066165,"type":"INSERT"} +{"data":[{"k1":"3","v0":"three"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":78,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12},"table":"t1","ts":1684770066821,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-2.txt new file mode 100644 index 000000000000..486a4e8431d1 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-2.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database_affix","es":1684770071000,"id":80,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3456357374451661b by user 1486767996652600 */ ALTER TABLE t1 ADD COLUMN v1 INT","sqlType":null,"table":"t1","ts":1684770071947,"type":"ALTER"} +{"data":[{"k1":"5","v0":"five","v1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"} +{"data":[{"k1":"7","v0":"seven","v1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-3.txt new file mode 100644 index 000000000000..d86e67b38c25 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic0/canal-data-3.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database_affix","es":1684770078000,"id":86,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3456356115855017B by user 1486767996652600 */ ALTER TABLE t1 MODIFY COLUMN v1 BIGINT","sqlType":null,"table":"t1","ts":1684770078387,"type":"ALTER"} +{"data":[{"k1":"9","v0":"nine","v1":"9000000000000"}],"database":"paimon_sync_database_affix","es":1684770078000,"id":87,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","v1":"BIGINT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":-5},"table":"t1","ts":1684770078907,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-1.txt new file mode 100644 index 000000000000..56930133a586 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k2":"2","v0":"two"}],"database":"paimon_sync_database_affix","es":1684770066000,"id":77,"isDdl":false,"mysqlType":{"k2":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k2"],"sql":"","sqlType":{"k2":4,"v0":12},"table":"t2","ts":1684770066495,"type":"INSERT"} +{"data":[{"k2":"4","v0":"four"}],"database":"paimon_sync_database_affix","es":1684770067000,"id":79,"isDdl":false,"mysqlType":{"k2":"INT","v0":"VARCHAR(10)"},"old":null,"pkNames":["k2"],"sql":"","sqlType":{"k2":4,"v0":12},"table":"t2","ts":1684770067141,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-2.txt new file mode 100644 index 000000000000..00ec34d5bde3 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-2.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database_affix","es":1684770072000,"id":82,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3456357374454209u by user 1486767996652600 */ ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)","sqlType":null,"table":"t2","ts":1684770072606,"type":"ALTER"} +{"data":[{"k2":"6","v0":"six","v1":"s_6"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":83,"isDdl":false,"mysqlType":{"k2":"INT","v0":"VARCHAR(10)","v1":"VARCHAR(10)"},"old":null,"pkNames":["k2"],"sql":"","sqlType":{"k2":4,"v0":12,"v1":12},"table":"t2","ts":1684770072927,"type":"INSERT"} +{"data":[{"k2":"8","v0":"eight","v1":"s_8"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":85,"isDdl":false,"mysqlType":{"k2":"INT","v0":"VARCHAR(10)","v1":"VARCHAR(10)"},"old":null,"pkNames":["k2"],"sql":"","sqlType":{"k2":4,"v0":12,"v1":12},"table":"t2","ts":1684770073583,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-3.txt new file mode 100644 index 000000000000..6ed3e9c5a597 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/prefixsuffix/topic1/canal-data-3.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database_affix","es":1684770079000,"id":88,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3456356115857302y by user 1486767996652600 */ ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)","sqlType":null,"table":"t2","ts":1684770079231,"type":"ALTER"} +{"data":[{"k2":"10","v0":"ten","v1":"long_s_10"}],"database":"paimon_sync_database_affix","es":1684770079000,"id":89,"isDdl":false,"mysqlType":{"k2":"INT","v0":"VARCHAR(10)","v1":"VARCHAR(20)"},"old":null,"pkNames":["k2"],"sql":"","sqlType":{"k2":4,"v0":12,"v1":12},"table":"t2","ts":1684770079553,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-1.txt new file mode 100644 index 000000000000..10a79de124bd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k":"1","v1":"one"}],"database":"paimon_sync_database","es":1684484516000,"id":608,"isDdl":false,"mysqlType":{"k":"int","v1":"varchar(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684484516409,"type":"INSERT"} +{"data":[{"k":"3","v1":"three"}],"database":"paimon_sync_database","es":1684484517000,"id":610,"isDdl":false,"mysqlType":{"k":"int","v1":"varchar(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684484517136,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-2.txt new file mode 100644 index 000000000000..17d50c6038ed --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-2.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database","es":1684484822000,"id":633,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_30755036368248687I by user 1486767996652600 */ ALTER TABLE t1 ADD COLUMN v2 INT","sqlType":null,"table":"t1","ts":1684484823122,"type":"ALTER"} +{"data":[{"k":"5","v1":"five","v2":"50"}],"database":"paimon_sync_database","es":1684484823000,"id":633,"isDdl":false,"mysqlType":{"k":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12,"v2":4},"table":"t1","ts":1684484823122,"type":"INSERT"} +{"data":[{"k":"7","v1":"seven","v2":"70"}],"database":"paimon_sync_database","es":1684484823000,"id":636,"isDdl":false,"mysqlType":{"k":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12,"v2":4},"table":"t1","ts":1684484823894,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-3.txt new file mode 100644 index 000000000000..9f9d5d82aee4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic0/canal-data-3.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database","es":1684484887000,"id":642,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3171173119829196a by user 1486767996652600 */ ALTER TABLE t1 MODIFY COLUMN v2 BIGINT","sqlType":null,"table":"t1","ts":1684484887766,"type":"ALTER"} +{"data":[{"k":"9","v1":"nine","v2":"9000000000000"}],"database":"paimon_sync_database","es":1684484888000,"id":643,"isDdl":false,"mysqlType":{"k":"int","v1":"varchar(10)","v2":"BIGINT"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12,"v2":-5},"table":"t1","ts":1684484888087,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-1.txt new file mode 100644 index 000000000000..c7765bf4d32d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k1":"2","k2":"two","v1":"20","v2":"200"}],"database":"paimon_sync_database","es":1684484516000,"id":609,"isDdl":false,"mysqlType":{"k1":"int","k2":"varchar(10)","v1":"int","v2":"bigint"},"old":null,"pkNames":["k1","k2"],"sql":"","sqlType":{"k1":4,"k2":12,"v1":4,"v2":-5},"table":"t2","ts":1684484516814,"type":"INSERT"} +{"data":[{"k1":"4","k2":"four","v1":"40","v2":"400"}],"database":"paimon_sync_database","es":1684484517000,"id":611,"isDdl":false,"mysqlType":{"k1":"int","k2":"varchar(10)","v1":"int","v2":"bigint"},"old":null,"pkNames":["k1","k2"],"sql":"","sqlType":{"k1":4,"k2":12,"v1":4,"v2":-5},"table":"t2","ts":1684484517457,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-2.txt new file mode 100644 index 000000000000..4abf9ee0f41e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-2.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database","es":1684484823000,"id":634,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_30755036368251972G by user 1486767996652600 */ ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)","sqlType":null,"table":"t2","ts":1684484823441,"type":"ALTER"} +{"data":[{"k1":"6","k2":"six","v1":"60","v2":"600","v3":"string_6"}],"database":"paimon_sync_database","es":1684484823000,"id":635,"isDdl":false,"mysqlType":{"k1":"int","k2":"varchar(10)","v1":"int","v2":"bigint","v3":"VARCHAR(10)"},"old":null,"pkNames":["k1","k2"],"sql":"","sqlType":{"k1":4,"k2":12,"v1":4,"v2":-5,"v3":12},"table":"t2","ts":1684484823569,"type":"INSERT"} +{"data":[{"k1":"8","k2":"eight","v1":"80","v2":"800","v3":"string_8"}],"database":"paimon_sync_database","es":1684484824000,"id":637,"isDdl":false,"mysqlType":{"k1":"int","k2":"varchar(10)","v1":"int","v2":"bigint","v3":"VARCHAR(10)"},"old":null,"pkNames":["k1","k2"],"sql":"","sqlType":{"k1":4,"k2":12,"v1":4,"v2":-5,"v3":12},"table":"t2","ts":1684484824017,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-3.txt new file mode 100644 index 000000000000..7db1c1aef060 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic1/canal-data-3.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":null,"database":"paimon_sync_database","es":1684484888000,"id":644,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* Query from DMS-WEBSQL-0-Qid_3171173119831958d by user 1486767996652600 */ ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)","sqlType":null,"table":"t2","ts":1684484888410,"type":"ALTER"} +{"data":[{"k1":"10","k2":"ten","v1":"100","v2":"1000","v3":"long_long_string_10"}],"database":"paimon_sync_database","es":1684484888000,"id":645,"isDdl":false,"mysqlType":{"k1":"int","k2":"varchar(10)","v1":"int","v2":"bigint","v3":"VARCHAR(20)"},"old":null,"pkNames":["k1","k2"],"sql":"","sqlType":{"k1":4,"k2":12,"v1":4,"v2":-5,"v3":12},"table":"t2","ts":1684484888731,"type":"INSERT"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-1.txt new file mode 100644 index 000000000000..3e0e7fb3d06d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"v1":"-1","__#alibaba_rds_row_id#__":"1"}],"database":"paimon_sync_database","es":1684484517000,"id":612,"isDdl":false,"mysqlType":{"v1":"int","__#alibaba_rds_row_id#__":"bigint"},"old":null,"pkNames":["__#alibaba_rds_row_id#__"],"sql":"","sqlType":{"v1":4,"__#alibaba_rds_row_id#__":-5},"table":"t3","ts":1684484517756,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-2.txt new file mode 100644 index 000000000000..e5cdfe96e7bb --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-2.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-3.txt new file mode 100644 index 000000000000..e5cdfe96e7bb --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/database/schemaevolution/topic2/canal-data-3.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +