getTableMapping() {
return tableMapping;
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
index 614f06a71..09f0f3a69 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -35,7 +35,16 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/** Use expression to match ddl sql. */
+/**
+ * Use expression to match ddl sql.
+ *
+ * The way of parsing DDL statements relies on regular expression matching, and this parsing
+ * method has many flaws. In order to solve this problem, we introduced the com.github.jsqlparser
+ * framework, which can accurately parse the schema change of DDL.
+ *
+ *
This class is no longer recommended, we recommend using {@link SQLParserSchemaChange}
+ */
+@Deprecated
public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class);
// alter table tbl add cloumn aca int
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 9b41e2fdb..7ef975e2b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -40,10 +40,6 @@
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +51,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
@@ -74,12 +71,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
Pattern.CASE_INSENSITIVE);
// schemaChange saves table names, field, and field column information
private Map> originFieldSchemaMap = new LinkedHashMap<>();
- private SourceConnector sourceConnector;
// create table properties
private final Map tableProperties;
- private String targetDatabase;
- private String targetTablePrefix;
- private String targetTableSuffix;
+ private final String targetDatabase;
+ private final String targetTablePrefix;
+ private final String targetTableSuffix;
private final Set filledTables = new HashSet<>();
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
@@ -124,6 +120,7 @@ public boolean schemaChange(JsonNode recordRoot) {
EventType eventType = extractEventType(recordRoot);
if (eventType == null) {
+ LOG.warn("Failed to parse eventType. recordRoot={}", recordRoot);
return false;
}
if (eventType.equals(EventType.CREATE)) {
@@ -137,43 +134,20 @@ public boolean schemaChange(JsonNode recordRoot) {
LOG.info("create table ddl status: {}", status);
}
} else if (eventType.equals(EventType.ALTER)) {
- // db,table
- Tuple2 tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
+ Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot);
+ if (dorisTableTuple == null) {
+ LOG.warn("Failed to get doris table tuple. record={}", recordRoot);
return false;
}
List ddlSqlList = extractDDLList(recordRoot);
- if (CollectionUtils.isEmpty(ddlSqlList)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
- List ddlSchemas = SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
- LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
- }
- } else {
- LOG.info("Unsupported event type {}", eventType);
+ status = executeAlterDDLs(ddlSqlList, recordRoot, dorisTableTuple, status);
}
} catch (Exception ex) {
- LOG.warn("schema change error :", ex);
+ LOG.warn("schema change error : ", ex);
}
return status;
}
- private JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
- JsonNode historyRecord = extractHistoryRecord(record);
- JsonNode tableChanges = historyRecord.get("tableChanges");
- if (!Objects.isNull(tableChanges)) {
- JsonNode tableChange = tableChanges.get(0);
- return tableChange;
- }
- return null;
- }
-
/** Parse Alter Event. */
@VisibleForTesting
public List extractDDLList(JsonNode record) throws IOException {
@@ -181,11 +155,19 @@ public List extractDDLList(JsonNode record) throws IOException {
JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping);
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
+ extractSourceConnector(record);
+ return parserDebeziumStructure(dorisTable, ddl, record);
+ }
+
+ private List parserDebeziumStructure(String dorisTable, String ddl, JsonNode record)
+ throws JsonProcessingException {
JsonNode tableChange = extractTableChange(record);
- EventType eventType = extractEventType(record);
- if (Objects.isNull(tableChange)
- || Objects.isNull(ddl)
- || !eventType.equals(EventType.ALTER)) {
+ if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
+ LOG.warn(
+ "tableChange or ddl is empty, cannot do schema change. dorisTable={}, tableChange={}, ddl={}",
+ dorisTable,
+ tableChange,
+ ddl);
return null;
}
@@ -284,7 +266,7 @@ public Integer getTableSchemaBuckets(Map tableBucketsMap, Strin
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular expression match,
- for (Map.Entry entry : tableBucketsMap.entrySet()) {
+ for (Entry entry : tableBucketsMap.entrySet()) {
Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
@@ -301,7 +283,7 @@ private List buildDistributeKeys(
return primaryKeys;
}
if (!fields.isEmpty()) {
- Map.Entry firstField = fields.entrySet().iterator().next();
+ Entry firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
@@ -320,21 +302,6 @@ private boolean checkSchemaChange(String database, String table, DDLSchema ddlSc
return schemaChangeManager.checkSchemaChange(database, table, param);
}
- /** Parse event type. */
- protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
- JsonNode tableChange = extractTableChange(record);
- if (tableChange == null || tableChange.get("type") == null) {
- return null;
- }
- String type = tableChange.get("type").asText();
- if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
- return EventType.ALTER;
- } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
- return EventType.CREATE;
- }
- return null;
- }
-
private Map extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
@@ -402,25 +369,8 @@ public String buildDorisTypeName(JsonNode column) {
int length = column.get("length") == null ? 0 : column.get("length").asInt();
int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
String sourceTypeName = column.get("typeName").asText();
- String dorisTypeName;
- switch (sourceConnector) {
- case MYSQL:
- dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
- break;
- case ORACLE:
- dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
- break;
- case POSTGRES:
- dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
- break;
- case SQLSERVER:
- dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
- break;
- default:
- String errMsg = "Not support " + sourceTypeName + " schema change.";
- throw new UnsupportedOperationException(errMsg);
- }
- return dorisTypeName;
+ return JsonDebeziumChangeUtils.buildDorisTypeName(
+ sourceConnector, sourceTypeName, length, scale);
}
private String handleDefaultValue(String defaultValue) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
new file mode 100644
index 000000000..6be3f72c8
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.schema.SQLParserSchemaManager;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SQLParserSchemaChange extends JsonDebeziumSchemaChange {
+ private static final Logger LOG = LoggerFactory.getLogger(SQLParserSchemaChange.class);
+ private final SQLParserSchemaManager sqlParserSchemaManager;
+
+ public SQLParserSchemaChange(JsonDebeziumChangeContext changeContext) {
+ this.changeContext = changeContext;
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ this.sqlParserSchemaManager = new SQLParserSchemaManager();
+ this.tableMapping = changeContext.getTableMapping();
+ this.objectMapper = changeContext.getObjectMapper();
+ }
+
+ @Override
+ public void init(JsonNode recordRoot, String dorisTableName) {
+ // do nothing
+ }
+
+ @Override
+ public boolean schemaChange(JsonNode recordRoot) {
+ boolean status = false;
+ try {
+ if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
+ return false;
+ }
+
+ EventType eventType = extractEventType(recordRoot);
+ if (eventType == null) {
+ LOG.warn("Failed to parse eventType. recordRoot={}", recordRoot);
+ return false;
+ }
+
+ if (eventType.equals(EventType.CREATE)) {
+ // TODO support auto create table
+ LOG.warn("Not auto support create table. recordRoot={}", recordRoot);
+ } else if (eventType.equals(EventType.ALTER)) {
+ Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot);
+ if (dorisTableTuple == null) {
+ LOG.warn("Failed to get doris table tuple. record={}", recordRoot);
+ return false;
+ }
+ List ddlList = tryParserAlterDDLs(recordRoot);
+ status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status);
+ }
+ } catch (Exception ex) {
+ LOG.warn("schema change error : ", ex);
+ }
+ return status;
+ }
+
+ @VisibleForTesting
+ public List tryParserAlterDDLs(JsonNode record) throws IOException {
+ String dorisTable =
+ JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping);
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ extractSourceConnector(record);
+ return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl, dorisTable);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 38b942ea4..55e864ca4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -130,6 +130,7 @@ private static void syncDatabase(
String excludingTables = params.get("excluding-tables");
String multiToOneOrigin = params.get("multi-to-one-origin");
String multiToOneTarget = params.get("multi-to-one-target");
+ String schemaChangeMode = params.get("schema-change-mode");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean ignoreIncompatible = params.has("ignore-incompatible");
@@ -157,6 +158,7 @@ private static void syncDatabase(
.setCreateTableOnly(createTableOnly)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
+ .setSchemaChangeMode(schemaChangeMode)
.create();
databaseSync.build();
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 5cea70f9c..9c4f2ac40 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -78,6 +78,7 @@ public abstract class DatabaseSync {
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange = true;
+ private String schemaChangeMode;
protected String includingTables;
protected String excludingTables;
protected String multiToOneOrigin;
@@ -342,6 +343,7 @@ public DorisRecordSerializer buildSchemaSerializer(
return JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
+ .setSchemaChangeMode(schemaChangeMode)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
@@ -560,6 +562,11 @@ public DatabaseSync setNewSchemaChange(boolean newSchemaChange) {
return this;
}
+ public DatabaseSync setSchemaChangeMode(String schemaChangeMode) {
+ this.schemaChangeMode = schemaChangeMode.trim();
+ return this;
+ }
+
public DatabaseSync setSingleSink(boolean singleSink) {
this.singleSink = singleSink;
return this;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
new file mode 100644
index 000000000..cbe3f08ab
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class SQLParserSchemaManagerTest {
+ private SQLParserSchemaManager schemaManager;
+ private String dorisTable;
+
+ @Before
+ public void init() {
+ schemaManager = new SQLParserSchemaManager();
+ dorisTable = "doris.tab";
+ }
+
+ @Test
+ public void testParserAlterDDLs() {
+ List expectDDLs = new ArrayList<>();
+ expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c1`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c2`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `c3` INT DEFAULT '100'");
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` ADD COLUMN `decimal_type` DECIMALV3(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment'");
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
+
+ SourceConnector mysql = SourceConnector.MYSQL;
+ String ddl =
+ "alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10)";
+ List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testParserAlterDDLsAdd() {
+ List expectDDLs = new ArrayList<>();
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(60)");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(765)");
+
+ SourceConnector mysql = SourceConnector.ORACLE;
+ String ddl =
+ "ALTER TABLE employees ADD (phone_number VARCHAR2(20), address VARCHAR2(255));";
+ List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testParserAlterDDLsChange() {
+ List expectDDLs = new ArrayList<>();
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` RENAME COLUMN `old_phone_number` `new_phone_number`");
+ expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `old_address` `new_address`");
+
+ SourceConnector mysql = SourceConnector.MYSQL;
+ String ddl =
+ "ALTER TABLE employees\n"
+ + "CHANGE COLUMN old_phone_number new_phone_number VARCHAR(20) NOT NULL,\n"
+ + "CHANGE COLUMN old_address new_address VARCHAR(255) DEFAULT 'Unknown',\n"
+ + "MODIFY COLUMN hire_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;";
+ List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable);
+ for (String actualDDL : actualDDLs) {
+ Assert.assertTrue(expectDDLs.contains(actualDDL));
+ }
+ }
+
+ @Test
+ public void testExtractCommentValue() {
+ String expectComment = "";
+ List columnSpecs = Arrays.asList("default", "'100'", "COMMENT", "''");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueQuotes() {
+ String expectComment = "comment_test";
+ List columnSpecs =
+ Arrays.asList("Default", "\"100\"", "comment", "\"comment_test\"");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueNull() {
+ List columnSpecs = Arrays.asList("default", null, "CommenT", null);
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertNull(actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueEmpty() {
+ List columnSpecs = Arrays.asList("default", null, "comment");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertNull(actualComment);
+ }
+
+ @Test
+ public void testExtractCommentValueA() {
+ String expectComment = "test";
+ List columnSpecs = Arrays.asList("comment", "test");
+ String actualComment = schemaManager.extractComment(columnSpecs);
+ Assert.assertEquals(expectComment, actualComment);
+ }
+
+ @Test
+ public void testExtractDefaultValue() {
+ String expectDefault = "100";
+ List columnSpecs = Arrays.asList("default", "'100'", "comment", "");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueQuotes() {
+ String expectDefault = "100";
+ List columnSpecs = Arrays.asList("default", "\"100\"", "comment", "");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueNull() {
+ List columnSpecs = Arrays.asList("Default", null, "comment", null);
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertNull(actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueEmpty() {
+ String expectDefault = null;
+ List columnSpecs = Arrays.asList("DEFAULT", "comment", null);
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueA() {
+ String expectDefault = "aaa";
+ List columnSpecs = Arrays.asList("default", "aaa");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertEquals(expectDefault, actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueNULL() {
+ List columnSpecs = Collections.singletonList("default");
+ String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ Assert.assertNull(actualDefault);
+ }
+
+ @Test
+ public void testRemoveContinuousChar() {
+ // Test removing continuous target characters from both ends
+ Assert.assertEquals("bc", schemaManager.removeContinuousChar("aaaabcaaa", 'a'));
+ Assert.assertEquals("bcde", schemaManager.removeContinuousChar("abcdea", 'a'));
+
+ // Test cases with no target character
+ Assert.assertEquals("abc", schemaManager.removeContinuousChar("abc", 'x'));
+
+ // Test cases with only target characters
+ Assert.assertEquals("", schemaManager.removeContinuousChar("aaaa", 'a'));
+ Assert.assertEquals("", schemaManager.removeContinuousChar("xxxxxxxx", 'x'));
+
+ // Test empty and null strings
+ Assert.assertNull(schemaManager.removeContinuousChar(null, 'a'));
+ Assert.assertEquals("", schemaManager.removeContinuousChar("", 'a'));
+
+ // Test single character strings
+ Assert.assertEquals("b", schemaManager.removeContinuousChar("b", 'a'));
+
+ // Test removing quotes
+ Assert.assertEquals("abc", schemaManager.removeContinuousChar("\"abc\"", '\"'));
+ Assert.assertEquals("a\"bc\"d", schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"'));
+ Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'", '\''));
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
new file mode 100644
index 000000000..d31ab04a1
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
+
+ private SQLParserSchemaChange schemaChange;
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ "",
+ "");
+ schemaChange = new SQLParserSchemaChange(changeContext);
+ }
+
+ @Test
+ public void testExtractDDLListMultipleColumns() throws IOException {
+ String sql0 = "ALTER TABLE `test`.`t1` DROP COLUMN `c11`";
+ String sql1 = "ALTER TABLE `test`.`t1` DROP COLUMN `c3`";
+ String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT '100'";
+ List srcSqlList = Arrays.asList(sql0, sql1, sql2);
+
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+ for (int i = 0; i < ddlSQLList.size(); i++) {
+ String srcSQL = srcSqlList.get(i);
+ String targetSQL = ddlSQLList.get(i);
+ Assert.assertEquals(srcSQL, targetSQL);
+ }
+ }
+
+ @Test
+ public void testExtractDDLListChangeColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+
+ String result = "ALTER TABLE `test`.`t1` RENAME COLUMN `c555` `c777`";
+ Assert.assertEquals(result, ddlSQLList.get(0));
+ }
+
+ @Test
+ public void testExtractDDLListRenameColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot);
+ Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c22` `c33`", ddlSQLList.get(0));
+ }
+
+ @Test
+ public void testExtractDDlListChangeName() throws IOException {
+ String columnInfo =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710925209991,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000288\",\"pos\":81654,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81654,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1710925209,\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81808,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 change age age1 int\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"name\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8\\\",\\\"length\\\":256,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"age1\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode record = objectMapper.readTree(columnInfo);
+ List changeNameList = schemaChange.tryParserAlterDDLs(record);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `age` `age1`", changeNameList.get(0));
+ }
+
+ @Test
+ public void testExtractDDlListChangeNameWithColumn() throws IOException {
+ String columnInfo =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1711088321412,\"snapshot\":\"false\",\"db\":\"doris_test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000292\",\"pos\":55695,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55695,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1711088321,\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55891,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1\\\\n change column `key` key_word int default 1 not null\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"key_word\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode record = objectMapper.readTree(columnInfo);
+ List changeNameList = schemaChange.tryParserAlterDDLs(record);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `key` `key_word`", changeNameList.get(0));
+ }
+
+ @Test
+ public void testAddDatetimeColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720596740767,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":10192,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10192,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720596740,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10405,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink34 add column `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'datatime_test'\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` ADD COLUMN `create_time` DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP COMMENT 'datatime_test'",
+ changeNameList.get(0));
+ }
+
+ @Test
+ public void testDropColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720599133910,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":12084,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12084,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720599133,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12219,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 drop column create_time\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` DROP COLUMN `create_time`", changeNameList.get(0));
+ }
+
+ @Test
+ public void testChangeColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720598926291,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":11804,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":11804,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720598926,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12007,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 CHANGE COLUMN `create_time2` `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordJsonNode = objectMapper.readTree(record);
+ List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2` `create_time`",
+ changeNameList.get(0));
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 2410ddaca..07744e37c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -20,6 +20,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.doris.flink.sink.schema.SchemaChangeMode;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import java.util.HashMap;
@@ -71,7 +72,8 @@ public static void main(String[] args) throws Exception {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
+ String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
boolean singleSink = false;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
@@ -90,6 +92,7 @@ public static void main(String[] args) throws Exception {
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
+ .setSchemaChangeMode(schemaChangeMode)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
.create();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index fba5866cb..35a5719ad 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
databaseSync
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 6c933409d..4d5a56f7c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
databaseSync
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 9fec63b69..3d6e1e991 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception {
String multiToOneOrigin = "a_.*|b_.*";
String multiToOneTarget = "a|b";
boolean ignoreDefaultValue = false;
- boolean useNewSchemaChange = false;
+ boolean useNewSchemaChange = true;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
databaseSync