diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b5080fb07..052180c41 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -92,6 +92,7 @@ under the License. 1.17.6 4.12 1.3 + 4.9 @@ -354,6 +355,11 @@ under the License. ${flink.version} test + + com.github.jsqlparser + jsqlparser + ${jsqlparser.version} + diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index ab26e3086..0d33eb9f2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -285,6 +285,9 @@ private static List identifier(List name) { } public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } return "`" + name + "`"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java new file mode 100644 index 000000000..6f157cdcb --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.flink.annotation.VisibleForTesting; + +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.alter.Alter; +import net.sf.jsqlparser.statement.alter.AlterExpression; +import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType; +import net.sf.jsqlparser.statement.alter.AlterOperation; +import net.sf.jsqlparser.statement.create.table.ColDataType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils; +import org.apache.doris.flink.tools.cdc.SourceConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL statements. */ +public class SQLParserSchemaManager implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SQLParserSchemaManager.class); + private static final String DEFAULT = "DEFAULT"; + private static final String COMMENT = "COMMENT"; + + /** + * Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used + * to parse the above schema change operations. + */ + public List parserAlterDDLs( + SourceConnector sourceConnector, String ddl, String dorisTable) { + List ddlList = new ArrayList<>(); + try { + Statement statement = CCJSqlParserUtil.parse(ddl); + if (statement instanceof Alter) { + Alter alterStatement = (Alter) statement; + List alterExpressions = alterStatement.getAlterExpressions(); + for (AlterExpression alterExpression : alterExpressions) { + AlterOperation operation = alterExpression.getOperation(); + switch (operation) { + case DROP: + String dropColumnDDL = + processDropColumnOperation(alterExpression, dorisTable); + ddlList.add(dropColumnDDL); + break; + case ADD: + List addColumnDDL = + processAddColumnOperation( + sourceConnector, alterExpression, dorisTable); + ddlList.addAll(addColumnDDL); + break; + case CHANGE: + String changeColumnDDL = + processChangeColumnOperation(alterExpression, dorisTable); + ddlList.add(changeColumnDDL); + break; + case RENAME: + String renameColumnDDL = + processRenameColumnOperation(alterExpression, dorisTable); + ddlList.add(renameColumnDDL); + break; + default: + LOG.warn( + "Unsupported alter ddl operations, operation={}, ddl={}", + operation.name(), + ddl); + } + } + } else { + LOG.warn("Unsupported ddl operations, ddl={}", ddl); + } + } catch (JSQLParserException e) { + LOG.warn("Failed to parse DDL SQL, SQL={}", ddl, e); + } + return ddlList; + } + + private String processDropColumnOperation(AlterExpression alterExpression, String dorisTable) { + String dropColumnDDL = + SchemaChangeHelper.buildDropColumnDDL(dorisTable, alterExpression.getColumnName()); + LOG.info("Parsed drop column DDL SQL is: {}", dropColumnDDL); + return dropColumnDDL; + } + + private List processAddColumnOperation( + SourceConnector sourceConnector, AlterExpression alterExpression, String dorisTable) { + List colDataTypeList = alterExpression.getColDataTypeList(); + List addColumnList = new ArrayList<>(); + for (ColumnDataType columnDataType : colDataTypeList) { + String columnName = columnDataType.getColumnName(); + ColDataType colDataType = columnDataType.getColDataType(); + String datatype = colDataType.getDataType(); + Integer length = null; + Integer scale = null; + if (CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) { + List argumentsStringList = colDataType.getArgumentsStringList(); + length = Integer.parseInt(argumentsStringList.get(0)); + if (argumentsStringList.size() == 2) { + scale = Integer.parseInt(argumentsStringList.get(1)); + } + } + datatype = + JsonDebeziumChangeUtils.buildDorisTypeName( + sourceConnector, datatype, length, scale); + + List columnSpecs = columnDataType.getColumnSpecs(); + String defaultValue = extractDefaultValue(columnSpecs); + String comment = extractComment(columnSpecs); + FieldSchema fieldSchema = new FieldSchema(columnName, datatype, defaultValue, comment); + String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema); + LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL); + addColumnList.add(addColumnDDL); + } + return addColumnList; + } + + private String processChangeColumnOperation( + AlterExpression alterExpression, String dorisTable) { + String columnNewName = alterExpression.getColDataTypeList().get(0).getColumnName(); + String columnOldName = alterExpression.getColumnOldName(); + String renameColumnDDL = + SchemaChangeHelper.buildRenameColumnDDL(dorisTable, columnOldName, columnNewName); + LOG.warn( + "Note: Only rename column names are supported in doris. " + + "Therefore, the change syntax used here only supports the use of rename." + + " Parsed change column DDL SQL is: {}", + renameColumnDDL); + return renameColumnDDL; + } + + private String processRenameColumnOperation( + AlterExpression alterExpression, String dorisTable) { + String columnNewName = alterExpression.getColumnName(); + String columnOldName = alterExpression.getColumnOldName(); + String renameColumnDDL = + SchemaChangeHelper.buildRenameColumnDDL(dorisTable, columnOldName, columnNewName); + LOG.info("Parsed rename column DDL SQL is: {}", renameColumnDDL); + return renameColumnDDL; + } + + @VisibleForTesting + public String extractDefaultValue(List columnSpecs) { + return extractAdjacentString(columnSpecs, DEFAULT); + } + + private String extractAdjacentString(List columnSpecs, String key) { + int columnSpecsSize = columnSpecs.size(); + for (int i = 0; i < columnSpecsSize; i++) { + String columnSpec = columnSpecs.get(i); + if (key.equalsIgnoreCase(columnSpec) && i < columnSpecsSize - 1) { + String adjacentString = columnSpecs.get(i + 1); + if (!(DEFAULT.equalsIgnoreCase(adjacentString)) + && !(COMMENT.equalsIgnoreCase(adjacentString))) { + return removeQuotes(adjacentString); + } + LOG.warn( + "Failed to extract adjacent string value. columnSpecs={}, key={}", + String.join(",", columnSpecs), + key); + } + } + return null; + } + + @VisibleForTesting + public String extractComment(List columnSpecs) { + return extractAdjacentString(columnSpecs, COMMENT); + } + + private String removeQuotes(String content) { + content = removeContinuousChar(content, '\''); + content = removeContinuousChar(content, '\"'); + return content; + } + + /** + * remove the continuous char in the string from both sides. + * + * @param str the input string, target the char to be removed + * @return the string without continuous chars from both sides + */ + @VisibleForTesting + public String removeContinuousChar(String str, char target) { + if (str == null || str.length() < 2) { + return str; + } + int start = 0; + int end = str.length() - 1; + while (start <= end && str.charAt(start) == target) { + start++; + } + while (end >= start && str.charAt(end) == target) { + end--; + } + return str.substring(start, end + 1); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 8d365ffcc..065468779 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -17,10 +17,9 @@ package org.apache.doris.flink.sink.schema; -import org.apache.flink.util.StringUtils; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; @@ -179,7 +178,7 @@ public static String buildModifyColumnDataTypeDDL( } private static void commentColumn(StringBuilder ddl, String comment) { - if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + if (StringUtils.isNotEmpty(comment)) { ddl.append(" COMMENT '").append(DorisSystem.quoteComment(comment)).append("'"); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java new file mode 100644 index 000000000..e55a4d311 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeMode.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +public enum SchemaChangeMode { + DEBEZIUM_STRUCTURE("debezium_structure"), + SQL_PARSER("sql_parser"); + + private final String name; + + SchemaChangeMode(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index e657864ab..c1ed1de1a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -29,12 +29,14 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.schema.SchemaChangeMode; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +78,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer initTableSet = new HashSet<>(); public JsonDebeziumSchemaSerializer( @@ -120,13 +123,15 @@ public JsonDebeziumSchemaSerializer( Map tableProperties, String targetDatabase, String targetTablePrefix, - String targetTableSuffix) { + String targetTableSuffix, + SchemaChangeMode schemaChangeMode) { this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions); this.tableMapping = tableMapping; this.tableProperties = tableProperties; this.targetDatabase = targetDatabase; this.targetTablePrefix = targetTablePrefix; this.targetTableSuffix = targetTableSuffix; + this.schemaChangeMode = schemaChangeMode; init(); } @@ -144,13 +149,29 @@ private void init() { ignoreUpdateBefore, targetTablePrefix, targetTableSuffix); - this.schemaChange = - newSchemaChange - ? new JsonDebeziumSchemaChangeImplV2(changeContext) - : new JsonDebeziumSchemaChangeImpl(changeContext); + initSchemaChangeInstance(changeContext); this.dataChange = new JsonDebeziumDataChange(changeContext); } + private void initSchemaChangeInstance(JsonDebeziumChangeContext changeContext) { + if (!newSchemaChange) { + LOG.info( + "newSchemaChange set to false, instantiation schema change uses JsonDebeziumSchemaChangeImpl."); + this.schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext); + return; + } + + if (Objects.nonNull(schemaChangeMode) + && SchemaChangeMode.SQL_PARSER.equals(schemaChangeMode)) { + LOG.info( + "SchemaChangeMode set to SQL_PARSER, instantiation schema change uses SQLParserService."); + this.schemaChange = new SQLParserSchemaChange(changeContext); + } else { + LOG.info("instantiation schema change uses JsonDebeziumSchemaChangeImplV2."); + this.schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext); + } + } + @Override public DorisRecord serialize(String record) throws IOException { LOG.debug("received debezium json data {} :", record); @@ -201,6 +222,7 @@ public static class Builder { private Pattern addDropDDLPattern; private String sourceTableName; private boolean newSchemaChange = true; + private SchemaChangeMode schemaChangeMode; private DorisExecutionOptions executionOptions; private Map tableMapping; private Map tableProperties; @@ -218,6 +240,14 @@ public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean newSchema return this; } + public JsonDebeziumSchemaSerializer.Builder setSchemaChangeMode(String schemaChangeMode) { + if (org.apache.commons.lang3.StringUtils.isEmpty(schemaChangeMode)) { + return this; + } + this.schemaChangeMode = SchemaChangeMode.valueOf(schemaChangeMode.toUpperCase()); + return this; + } + public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) { this.addDropDDLPattern = addDropDDLPattern; return this; @@ -273,7 +303,8 @@ public JsonDebeziumSchemaSerializer build() { tableProperties, targetDatabase, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + schemaChangeMode); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index a7253d2f0..2a3eebe07 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -38,8 +38,8 @@ public class JsonDebeziumChangeContext implements Serializable { private final Pattern pattern; private final String lineDelimiter; private final boolean ignoreUpdateBefore; - private String targetTablePrefix; - private String targetTableSuffix; + private final String targetTablePrefix; + private final String targetTableSuffix; public JsonDebeziumChangeContext( DorisOptions dorisOptions, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java index 921607df6..36acecd42 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java @@ -23,10 +23,20 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.mysql.MysqlType; +import org.apache.doris.flink.tools.cdc.oracle.OracleType; +import org.apache.doris.flink.tools.cdc.postgres.PostgresType; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; import java.util.Map; +import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL; +import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE; +import static org.apache.doris.flink.tools.cdc.SourceConnector.POSTGRES; +import static org.apache.doris.flink.tools.cdc.SourceConnector.SQLSERVER; + public class JsonDebeziumChangeUtils { public static String getDorisTableIdentifier( @@ -62,4 +72,27 @@ public static String extractJsonNode(JsonNode record, String key) { ? record.get(key).asText() : null; } + + public static String buildDorisTypeName( + SourceConnector sourceConnector, String dataType, Integer length, Integer scale) { + String dorisTypeName; + switch (sourceConnector) { + case MYSQL: + dorisTypeName = MysqlType.toDorisType(dataType, length, scale); + break; + case ORACLE: + dorisTypeName = OracleType.toDorisType(dataType, length, scale); + break; + case POSTGRES: + dorisTypeName = PostgresType.toDorisType(dataType, length, scale); + break; + case SQLSERVER: + dorisTypeName = SqlServerType.toDorisType(dataType, length, scale); + break; + default: + String errMsg = sourceConnector + " not support " + dataType + " schema change."; + throw new UnsupportedOperationException(errMsg); + } + return dorisTypeName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index ccb204693..a2164b726 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -25,11 +25,20 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.EventType; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Pattern; /** @@ -43,6 +52,7 @@ * be enabled by configuring use-new-schema-change. */ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { + private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChange.class); protected static String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; protected Pattern addDropDDLPattern; @@ -55,6 +65,7 @@ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { protected Map tableMapping; protected SchemaChangeManager schemaChangeManager; protected JsonDebeziumChangeContext changeContext; + protected SourceConnector sourceConnector; public abstract boolean schemaChange(JsonNode recordRoot); @@ -89,6 +100,12 @@ protected String extractJsonNode(JsonNode record, String key) { : null; } + /** + * Parse doris database and table as a tuple. + * + * @param record from flink cdc. + * @return Tuple(database, table) + */ protected Tuple2 getDorisTableTuple(JsonNode record) { String identifier = JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping); @@ -120,6 +137,58 @@ protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingEx return record; } + /** Parse event type. */ + protected EventType extractEventType(JsonNode record) throws JsonProcessingException { + JsonNode tableChange = extractTableChange(record); + if (tableChange == null || tableChange.get("type") == null) { + return null; + } + String type = tableChange.get("type").asText(); + if (EventType.ALTER.toString().equalsIgnoreCase(type)) { + return EventType.ALTER; + } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) { + return EventType.CREATE; + } + LOG.warn("Not supported this event type. type={}", type); + return null; + } + + protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + JsonNode tableChanges = historyRecord.get("tableChanges"); + if (Objects.nonNull(tableChanges)) { + return tableChanges.get(0); + } + LOG.warn("Failed to extract tableChanges. record={}", record); + return null; + } + + protected boolean executeAlterDDLs( + List ddlSqlList, + JsonNode recordRoot, + Tuple2 dorisTableTuple, + boolean status) + throws IOException, IllegalArgumentException { + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("The recordRoot cannot extract ddl sql. recordRoot={}", recordRoot); + return false; + } + + for (String ddlSql : ddlSqlList) { + status = schemaChangeManager.execute(ddlSql, dorisTableTuple.f0); + LOG.info("schema change status:{}, ddl: {}", status, ddlSql); + } + return status; + } + + protected void extractSourceConnector(JsonNode record) { + if (Objects.isNull(sourceConnector)) { + sourceConnector = + SourceConnector.valueOf( + record.get("source").get("connector").asText().toUpperCase()); + } + } + public Map getTableMapping() { return tableMapping; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java index 614f06a71..09f0f3a69 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java @@ -35,7 +35,16 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -/** Use expression to match ddl sql. */ +/** + * Use expression to match ddl sql. + * + *

The way of parsing DDL statements relies on regular expression matching, and this parsing + * method has many flaws. In order to solve this problem, we introduced the com.github.jsqlparser + * framework, which can accurately parse the schema change of DDL. + * + *

This class is no longer recommended, we recommend using {@link SQLParserSchemaChange} + */ +@Deprecated public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange { private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class); // alter table tbl add cloumn aca int diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 9b41e2fdb..7ef975e2b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -40,10 +40,6 @@ import org.apache.doris.flink.sink.writer.EventType; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceConnector; -import org.apache.doris.flink.tools.cdc.mysql.MysqlType; -import org.apache.doris.flink.tools.cdc.oracle.OracleType; -import org.apache.doris.flink.tools.cdc.postgres.PostgresType; -import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +51,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; @@ -74,12 +71,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { Pattern.CASE_INSENSITIVE); // schemaChange saves table names, field, and field column information private Map> originFieldSchemaMap = new LinkedHashMap<>(); - private SourceConnector sourceConnector; // create table properties private final Map tableProperties; - private String targetDatabase; - private String targetTablePrefix; - private String targetTableSuffix; + private final String targetDatabase; + private final String targetTablePrefix; + private final String targetTableSuffix; private final Set filledTables = new HashSet<>(); public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) { @@ -124,6 +120,7 @@ public boolean schemaChange(JsonNode recordRoot) { EventType eventType = extractEventType(recordRoot); if (eventType == null) { + LOG.warn("Failed to parse eventType. recordRoot={}", recordRoot); return false; } if (eventType.equals(EventType.CREATE)) { @@ -137,43 +134,20 @@ public boolean schemaChange(JsonNode recordRoot) { LOG.info("create table ddl status: {}", status); } } else if (eventType.equals(EventType.ALTER)) { - // db,table - Tuple2 tuple = getDorisTableTuple(recordRoot); - if (tuple == null) { + Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); + if (dorisTableTuple == null) { + LOG.warn("Failed to get doris table tuple. record={}", recordRoot); return false; } List ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); - return false; - } - List ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); - } - } else { - LOG.info("Unsupported event type {}", eventType); + status = executeAlterDDLs(ddlSqlList, recordRoot, dorisTableTuple, status); } } catch (Exception ex) { - LOG.warn("schema change error :", ex); + LOG.warn("schema change error : ", ex); } return status; } - private JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { - JsonNode historyRecord = extractHistoryRecord(record); - JsonNode tableChanges = historyRecord.get("tableChanges"); - if (!Objects.isNull(tableChanges)) { - JsonNode tableChange = tableChanges.get(0); - return tableChange; - } - return null; - } - /** Parse Alter Event. */ @VisibleForTesting public List extractDDLList(JsonNode record) throws IOException { @@ -181,11 +155,19 @@ public List extractDDLList(JsonNode record) throws IOException { JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping); JsonNode historyRecord = extractHistoryRecord(record); String ddl = extractJsonNode(historyRecord, "ddl"); + extractSourceConnector(record); + return parserDebeziumStructure(dorisTable, ddl, record); + } + + private List parserDebeziumStructure(String dorisTable, String ddl, JsonNode record) + throws JsonProcessingException { JsonNode tableChange = extractTableChange(record); - EventType eventType = extractEventType(record); - if (Objects.isNull(tableChange) - || Objects.isNull(ddl) - || !eventType.equals(EventType.ALTER)) { + if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { + LOG.warn( + "tableChange or ddl is empty, cannot do schema change. dorisTable={}, tableChange={}, ddl={}", + dorisTable, + tableChange, + ddl); return null; } @@ -284,7 +266,7 @@ public Integer getTableSchemaBuckets(Map tableBucketsMap, Strin return tableBucketsMap.get(tableName); } // Secondly, iterate over the map to find a corresponding regular expression match, - for (Map.Entry entry : tableBucketsMap.entrySet()) { + for (Entry entry : tableBucketsMap.entrySet()) { Pattern pattern = Pattern.compile(entry.getKey()); if (pattern.matcher(tableName).matches()) { @@ -301,7 +283,7 @@ private List buildDistributeKeys( return primaryKeys; } if (!fields.isEmpty()) { - Map.Entry firstField = fields.entrySet().iterator().next(); + Entry firstField = fields.entrySet().iterator().next(); return Collections.singletonList(firstField.getKey()); } return new ArrayList<>(); @@ -320,21 +302,6 @@ private boolean checkSchemaChange(String database, String table, DDLSchema ddlSc return schemaChangeManager.checkSchemaChange(database, table, param); } - /** Parse event type. */ - protected EventType extractEventType(JsonNode record) throws JsonProcessingException { - JsonNode tableChange = extractTableChange(record); - if (tableChange == null || tableChange.get("type") == null) { - return null; - } - String type = tableChange.get("type").asText(); - if (EventType.ALTER.toString().equalsIgnoreCase(type)) { - return EventType.ALTER; - } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) { - return EventType.CREATE; - } - return null; - } - private Map extractBeforeRow(JsonNode record) { return extractRow(record.get("before")); } @@ -402,25 +369,8 @@ public String buildDorisTypeName(JsonNode column) { int length = column.get("length") == null ? 0 : column.get("length").asInt(); int scale = column.get("scale") == null ? 0 : column.get("scale").asInt(); String sourceTypeName = column.get("typeName").asText(); - String dorisTypeName; - switch (sourceConnector) { - case MYSQL: - dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale); - break; - case ORACLE: - dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale); - break; - case POSTGRES: - dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale); - break; - case SQLSERVER: - dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale); - break; - default: - String errMsg = "Not support " + sourceTypeName + " schema change."; - throw new UnsupportedOperationException(errMsg); - } - return dorisTypeName; + return JsonDebeziumChangeUtils.buildDorisTypeName( + sourceConnector, sourceTypeName, length, scale); } private String handleDefaultValue(String defaultValue) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java new file mode 100644 index 000000000..6be3f72c8 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.sink.schema.SQLParserSchemaManager; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class SQLParserSchemaChange extends JsonDebeziumSchemaChange { + private static final Logger LOG = LoggerFactory.getLogger(SQLParserSchemaChange.class); + private final SQLParserSchemaManager sqlParserSchemaManager; + + public SQLParserSchemaChange(JsonDebeziumChangeContext changeContext) { + this.changeContext = changeContext; + this.dorisOptions = changeContext.getDorisOptions(); + this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + this.sqlParserSchemaManager = new SQLParserSchemaManager(); + this.tableMapping = changeContext.getTableMapping(); + this.objectMapper = changeContext.getObjectMapper(); + } + + @Override + public void init(JsonNode recordRoot, String dorisTableName) { + // do nothing + } + + @Override + public boolean schemaChange(JsonNode recordRoot) { + boolean status = false; + try { + if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) { + return false; + } + + EventType eventType = extractEventType(recordRoot); + if (eventType == null) { + LOG.warn("Failed to parse eventType. recordRoot={}", recordRoot); + return false; + } + + if (eventType.equals(EventType.CREATE)) { + // TODO support auto create table + LOG.warn("Not auto support create table. recordRoot={}", recordRoot); + } else if (eventType.equals(EventType.ALTER)) { + Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); + if (dorisTableTuple == null) { + LOG.warn("Failed to get doris table tuple. record={}", recordRoot); + return false; + } + List ddlList = tryParserAlterDDLs(recordRoot); + status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status); + } + } catch (Exception ex) { + LOG.warn("schema change error : ", ex); + } + return status; + } + + @VisibleForTesting + public List tryParserAlterDDLs(JsonNode record) throws IOException { + String dorisTable = + JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping); + JsonNode historyRecord = extractHistoryRecord(record); + String ddl = extractJsonNode(historyRecord, "ddl"); + extractSourceConnector(record); + return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl, dorisTable); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 38b942ea4..55e864ca4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -130,6 +130,7 @@ private static void syncDatabase( String excludingTables = params.get("excluding-tables"); String multiToOneOrigin = params.get("multi-to-one-origin"); String multiToOneTarget = params.get("multi-to-one-target"); + String schemaChangeMode = params.get("schema-change-mode"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); boolean ignoreIncompatible = params.has("ignore-incompatible"); @@ -157,6 +158,7 @@ private static void syncDatabase( .setCreateTableOnly(createTableOnly) .setSingleSink(singleSink) .setIgnoreIncompatible(ignoreIncompatible) + .setSchemaChangeMode(schemaChangeMode) .create(); databaseSync.build(); if (StringUtils.isNullOrWhitespaceOnly(jobName)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 5cea70f9c..9c4f2ac40 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -78,6 +78,7 @@ public abstract class DatabaseSync { public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange = true; + private String schemaChangeMode; protected String includingTables; protected String excludingTables; protected String multiToOneOrigin; @@ -342,6 +343,7 @@ public DorisRecordSerializer buildSchemaSerializer( return JsonDebeziumSchemaSerializer.builder() .setDorisOptions(dorisBuilder.build()) .setNewSchemaChange(newSchemaChange) + .setSchemaChangeMode(schemaChangeMode) .setExecutionOptions(executionOptions) .setTableMapping(tableMapping) .setTableProperties(tableConfig) @@ -560,6 +562,11 @@ public DatabaseSync setNewSchemaChange(boolean newSchemaChange) { return this; } + public DatabaseSync setSchemaChangeMode(String schemaChangeMode) { + this.schemaChangeMode = schemaChangeMode.trim(); + return this; + } + public DatabaseSync setSingleSink(boolean singleSink) { this.singleSink = singleSink; return this; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java new file mode 100644 index 000000000..cbe3f08ab --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.doris.flink.tools.cdc.SourceConnector; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SQLParserSchemaManagerTest { + private SQLParserSchemaManager schemaManager; + private String dorisTable; + + @Before + public void init() { + schemaManager = new SQLParserSchemaManager(); + dorisTable = "doris.tab"; + } + + @Test + public void testParserAlterDDLs() { + List expectDDLs = new ArrayList<>(); + expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c1`"); + expectDDLs.add("ALTER TABLE `doris`.`tab` DROP COLUMN `c2`"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `c3` INT DEFAULT '100'"); + expectDDLs.add( + "ALTER TABLE `doris`.`tab` ADD COLUMN `decimal_type` DECIMALV3(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment'"); + expectDDLs.add( + "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'"); + expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`"); + expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`"); + + SourceConnector mysql = SourceConnector.MYSQL; + String ddl = + "alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10)"; + List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + for (String actualDDL : actualDDLs) { + Assert.assertTrue(expectDDLs.contains(actualDDL)); + } + } + + @Test + public void testParserAlterDDLsAdd() { + List expectDDLs = new ArrayList<>(); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `phone_number` VARCHAR(60)"); + expectDDLs.add("ALTER TABLE `doris`.`tab` ADD COLUMN `address` VARCHAR(765)"); + + SourceConnector mysql = SourceConnector.ORACLE; + String ddl = + "ALTER TABLE employees ADD (phone_number VARCHAR2(20), address VARCHAR2(255));"; + List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + for (String actualDDL : actualDDLs) { + Assert.assertTrue(expectDDLs.contains(actualDDL)); + } + } + + @Test + public void testParserAlterDDLsChange() { + List expectDDLs = new ArrayList<>(); + expectDDLs.add( + "ALTER TABLE `doris`.`tab` RENAME COLUMN `old_phone_number` `new_phone_number`"); + expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `old_address` `new_address`"); + + SourceConnector mysql = SourceConnector.MYSQL; + String ddl = + "ALTER TABLE employees\n" + + "CHANGE COLUMN old_phone_number new_phone_number VARCHAR(20) NOT NULL,\n" + + "CHANGE COLUMN old_address new_address VARCHAR(255) DEFAULT 'Unknown',\n" + + "MODIFY COLUMN hire_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;"; + List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + for (String actualDDL : actualDDLs) { + Assert.assertTrue(expectDDLs.contains(actualDDL)); + } + } + + @Test + public void testExtractCommentValue() { + String expectComment = ""; + List columnSpecs = Arrays.asList("default", "'100'", "COMMENT", "''"); + String actualComment = schemaManager.extractComment(columnSpecs); + Assert.assertEquals(expectComment, actualComment); + } + + @Test + public void testExtractCommentValueQuotes() { + String expectComment = "comment_test"; + List columnSpecs = + Arrays.asList("Default", "\"100\"", "comment", "\"comment_test\""); + String actualComment = schemaManager.extractComment(columnSpecs); + Assert.assertEquals(expectComment, actualComment); + } + + @Test + public void testExtractCommentValueNull() { + List columnSpecs = Arrays.asList("default", null, "CommenT", null); + String actualComment = schemaManager.extractComment(columnSpecs); + Assert.assertNull(actualComment); + } + + @Test + public void testExtractCommentValueEmpty() { + List columnSpecs = Arrays.asList("default", null, "comment"); + String actualComment = schemaManager.extractComment(columnSpecs); + Assert.assertNull(actualComment); + } + + @Test + public void testExtractCommentValueA() { + String expectComment = "test"; + List columnSpecs = Arrays.asList("comment", "test"); + String actualComment = schemaManager.extractComment(columnSpecs); + Assert.assertEquals(expectComment, actualComment); + } + + @Test + public void testExtractDefaultValue() { + String expectDefault = "100"; + List columnSpecs = Arrays.asList("default", "'100'", "comment", ""); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertEquals(expectDefault, actualDefault); + } + + @Test + public void testExtractDefaultValueQuotes() { + String expectDefault = "100"; + List columnSpecs = Arrays.asList("default", "\"100\"", "comment", ""); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertEquals(expectDefault, actualDefault); + } + + @Test + public void testExtractDefaultValueNull() { + List columnSpecs = Arrays.asList("Default", null, "comment", null); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertNull(actualDefault); + } + + @Test + public void testExtractDefaultValueEmpty() { + String expectDefault = null; + List columnSpecs = Arrays.asList("DEFAULT", "comment", null); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertEquals(expectDefault, actualDefault); + } + + @Test + public void testExtractDefaultValueA() { + String expectDefault = "aaa"; + List columnSpecs = Arrays.asList("default", "aaa"); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertEquals(expectDefault, actualDefault); + } + + @Test + public void testExtractDefaultValueNULL() { + List columnSpecs = Collections.singletonList("default"); + String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + Assert.assertNull(actualDefault); + } + + @Test + public void testRemoveContinuousChar() { + // Test removing continuous target characters from both ends + Assert.assertEquals("bc", schemaManager.removeContinuousChar("aaaabcaaa", 'a')); + Assert.assertEquals("bcde", schemaManager.removeContinuousChar("abcdea", 'a')); + + // Test cases with no target character + Assert.assertEquals("abc", schemaManager.removeContinuousChar("abc", 'x')); + + // Test cases with only target characters + Assert.assertEquals("", schemaManager.removeContinuousChar("aaaa", 'a')); + Assert.assertEquals("", schemaManager.removeContinuousChar("xxxxxxxx", 'x')); + + // Test empty and null strings + Assert.assertNull(schemaManager.removeContinuousChar(null, 'a')); + Assert.assertEquals("", schemaManager.removeContinuousChar("", 'a')); + + // Test single character strings + Assert.assertEquals("b", schemaManager.removeContinuousChar("b", 'a')); + + // Test removing quotes + Assert.assertEquals("abc", schemaManager.removeContinuousChar("\"abc\"", '\"')); + Assert.assertEquals("a\"bc\"d", schemaManager.removeContinuousChar("\"a\"bc\"d\"", '\"')); + Assert.assertEquals("abc", schemaManager.removeContinuousChar("'abc'", '\'')); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java new file mode 100644 index 000000000..d31ab04a1 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase { + + private SQLParserSchemaChange schemaChange; + + @Before + public void setUp() { + super.setUp(); + JsonDebeziumChangeContext changeContext = + new JsonDebeziumChangeContext( + dorisOptions, + tableMapping, + null, + null, + null, + objectMapper, + null, + lineDelimiter, + ignoreUpdateBefore, + "", + ""); + schemaChange = new SQLParserSchemaChange(changeContext); + } + + @Test + public void testExtractDDLListMultipleColumns() throws IOException { + String sql0 = "ALTER TABLE `test`.`t1` DROP COLUMN `c11`"; + String sql1 = "ALTER TABLE `test`.`t1` DROP COLUMN `c3`"; + String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT '100'"; + List srcSqlList = Arrays.asList(sql0, sql1, sql2); + + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordRoot = objectMapper.readTree(record); + List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + for (int i = 0; i < ddlSQLList.size(); i++) { + String srcSQL = srcSqlList.get(i); + String targetSQL = ddlSQLList.get(i); + Assert.assertEquals(srcSQL, targetSQL); + } + } + + @Test + public void testExtractDDLListChangeColumn() throws IOException { + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordRoot = objectMapper.readTree(record); + List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + + String result = "ALTER TABLE `test`.`t1` RENAME COLUMN `c555` `c777`"; + Assert.assertEquals(result, ddlSQLList.get(0)); + } + + @Test + public void testExtractDDLListRenameColumn() throws IOException { + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordRoot = objectMapper.readTree(record); + List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c22` `c33`", ddlSQLList.get(0)); + } + + @Test + public void testExtractDDlListChangeName() throws IOException { + String columnInfo = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710925209991,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000288\",\"pos\":81654,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81654,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1710925209,\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81808,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 change age age1 int\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"name\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8\\\",\\\"length\\\":256,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"age1\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode record = objectMapper.readTree(columnInfo); + List changeNameList = schemaChange.tryParserAlterDDLs(record); + Assert.assertEquals( + "ALTER TABLE `test`.`t1` RENAME COLUMN `age` `age1`", changeNameList.get(0)); + } + + @Test + public void testExtractDDlListChangeNameWithColumn() throws IOException { + String columnInfo = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1711088321412,\"snapshot\":\"false\",\"db\":\"doris_test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000292\",\"pos\":55695,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55695,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1711088321,\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55891,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1\\\\n change column `key` key_word int default 1 not null\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"key_word\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode record = objectMapper.readTree(columnInfo); + List changeNameList = schemaChange.tryParserAlterDDLs(record); + Assert.assertEquals( + "ALTER TABLE `test`.`t1` RENAME COLUMN `key` `key_word`", changeNameList.get(0)); + } + + @Test + public void testAddDatetimeColumn() throws IOException { + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720596740767,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":10192,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10192,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720596740,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10405,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink34 add column `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'datatime_test'\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordJsonNode = objectMapper.readTree(record); + List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + Assert.assertEquals( + "ALTER TABLE `test`.`t1` ADD COLUMN `create_time` DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP COMMENT 'datatime_test'", + changeNameList.get(0)); + } + + @Test + public void testDropColumn() throws IOException { + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720599133910,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":12084,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12084,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720599133,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12219,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 drop column create_time\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordJsonNode = objectMapper.readTree(record); + List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + Assert.assertEquals( + "ALTER TABLE `test`.`t1` DROP COLUMN `create_time`", changeNameList.get(0)); + } + + @Test + public void testChangeColumn() throws IOException { + String record = + "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720598926291,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":11804,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":11804,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720598926,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12007,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 CHANGE COLUMN `create_time2` `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; + JsonNode recordJsonNode = objectMapper.readTree(record); + List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + Assert.assertEquals( + "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2` `create_time`", + changeNameList.get(0)); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 2410ddaca..07744e37c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.doris.flink.sink.schema.SchemaChangeMode; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import java.util.HashMap; @@ -71,7 +72,8 @@ public static void main(String[] args) throws Exception { String multiToOneOrigin = "a_.*|b_.*"; String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; - boolean useNewSchemaChange = false; + boolean useNewSchemaChange = true; + String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName(); boolean singleSink = false; boolean ignoreIncompatible = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); @@ -90,6 +92,7 @@ public static void main(String[] args) throws Exception { .setTableConfig(tableConfig) .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) + .setSchemaChangeMode(schemaChangeMode) .setSingleSink(singleSink) .setIgnoreIncompatible(ignoreIncompatible) .create(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index fba5866cb..35a5719ad 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception { String multiToOneOrigin = "a_.*|b_.*"; String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; - boolean useNewSchemaChange = false; + boolean useNewSchemaChange = true; boolean ignoreIncompatible = false; DatabaseSync databaseSync = new OracleDatabaseSync(); databaseSync diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index 6c933409d..4d5a56f7c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception { String multiToOneOrigin = "a_.*|b_.*"; String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; - boolean useNewSchemaChange = false; + boolean useNewSchemaChange = true; boolean ignoreIncompatible = false; DatabaseSync databaseSync = new PostgresDatabaseSync(); databaseSync diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 9fec63b69..3d6e1e991 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception { String multiToOneOrigin = "a_.*|b_.*"; String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; - boolean useNewSchemaChange = false; + boolean useNewSchemaChange = true; boolean ignoreIncompatible = false; DatabaseSync databaseSync = new SqlServerDatabaseSync(); databaseSync