diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index a6e921587..79d6b2d7a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -50,7 +50,7 @@ import org.apache.flink.util.StringUtils; import org.apache.commons.compress.utils.Lists; -import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; @@ -353,15 +353,14 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } List primaryKeys = getCreateDorisKeys(table.getSchema()); - TableSchema schema = new TableSchema(); - schema.setDatabase(tablePath.getDatabaseName()); - schema.setTable(tablePath.getObjectName()); - schema.setTableComment(table.getComment()); - schema.setFields(getCreateDorisColumns(table.getSchema())); - schema.setKeys(primaryKeys); - schema.setModel(DataModel.UNIQUE); - schema.setDistributeKeys(primaryKeys); - schema.setProperties(getCreateTableProps(options)); + TableSchema schema = + DorisSchemaFactory.createTableSchema( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + getCreateDorisColumns(table.getSchema()), + primaryKeys, + getCreateTableProps(options), + table.getComment()); dorisSystem.createTable(schema); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java new file mode 100644 index 000000000..b621cfdc6 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.catalog.doris; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Pattern; + +/** + * Factory that creates doris schema. + * + *

In the case where doris schema needs to be created, it is best to create it through this + * factory + */ +public class DorisSchemaFactory { + private static Map tableBucketMap; + + public static TableSchema createTableSchema( + String database, + String table, + Map columnFields, + List pkKeys, + Map tableProperties, + String tableComment) { + TableSchema tableSchema = new TableSchema(); + tableSchema.setDatabase(database); + tableSchema.setTable(table); + tableSchema.setModel( + CollectionUtils.isEmpty(pkKeys) ? DataModel.DUPLICATE : DataModel.UNIQUE); + tableSchema.setFields(columnFields); + tableSchema.setKeys(buildKeys(pkKeys, columnFields)); + tableSchema.setTableComment(tableComment); + tableSchema.setDistributeKeys(buildDistributeKeys(pkKeys, columnFields)); + tableSchema.setProperties(tableProperties); + if (tableProperties.containsKey("table-buckets")) { + if (MapUtils.isEmpty(tableBucketMap)) { + String tableBucketsConfig = tableProperties.get("table-buckets"); + tableBucketMap = buildTableBucketMap(tableBucketsConfig); + } + Integer buckets = parseTableSchemaBuckets(tableBucketMap, table); + tableSchema.setTableBuckets(buckets); + } + return tableSchema; + } + + private static List buildDistributeKeys( + List primaryKeys, Map fields) { + return buildKeys(primaryKeys, fields); + } + + /** + * Theoretically, the duplicate table of doris does not need to distinguish the key column, but + * in the actual table creation statement, the key column will be automatically added. So if it + * is a duplicate table, primaryKeys is empty, and we uniformly take the first field as the key. + */ + private static List buildKeys( + List primaryKeys, Map fields) { + if (CollectionUtils.isNotEmpty(primaryKeys)) { + return primaryKeys; + } + if (!fields.isEmpty()) { + Entry firstField = fields.entrySet().iterator().next(); + return Collections.singletonList(firstField.getKey()); + } + return new ArrayList<>(); + } + + @VisibleForTesting + public static Integer parseTableSchemaBuckets( + Map tableBucketsMap, String tableName) { + if (tableBucketsMap != null) { + // Firstly, if the table name is in the table-buckets map, set the buckets of the table. + if (tableBucketsMap.containsKey(tableName)) { + return tableBucketsMap.get(tableName); + } + // Secondly, iterate over the map to find a corresponding regular expression match. + for (Entry entry : tableBucketsMap.entrySet()) { + Pattern pattern = Pattern.compile(entry.getKey()); + if (pattern.matcher(tableName).matches()) { + return entry.getValue(); + } + } + } + return null; + } + + /** + * Build table bucket Map. + * + * @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30 + * @return The table name and buckets map. The key is table name, the value is buckets. + */ + @VisibleForTesting + public static Map buildTableBucketMap(String tableBuckets) { + Map tableBucketsMap = new LinkedHashMap<>(); + String[] tableBucketsArray = tableBuckets.split(","); + for (String tableBucket : tableBucketsArray) { + String[] tableBucketArray = tableBucket.split(":"); + tableBucketsMap.put( + tableBucketArray[0].trim(), Integer.parseInt(tableBucketArray[1].trim())); + } + return tableBucketsMap; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java index 5acedfc28..c44f74336 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -31,11 +31,10 @@ import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.table.Index; import org.apache.commons.collections.CollectionUtils; -import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils; -import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +58,7 @@ public class SQLParserSchemaManager implements Serializable { * Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used * to parse the above schema change operations. */ - public List parserAlterDDLs( + public List parseAlterDDLs( SourceConnector sourceConnector, String ddl, String dorisTable) { List ddlList = new ArrayList<>(); try { @@ -137,30 +136,16 @@ public TableSchema parseCreateTableStatement( List indexes = createTable.getIndexes(); extractIndexesPrimaryKey(indexes, pkKeys); - String[] dbTable = dorisTable.split("\\."); Preconditions.checkArgument(dbTable.length == 2); - TableSchema tableSchema = new TableSchema(); - tableSchema.setDatabase(dbTable[0]); - tableSchema.setTable(dbTable[1]); - tableSchema.setModel(pkKeys.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE); - tableSchema.setFields(columnFields); - tableSchema.setKeys(pkKeys); - tableSchema.setTableComment( + + return DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + pkKeys, + tableProperties, extractTableComment(createTable.getTableOptionsStrings())); - tableSchema.setDistributeKeys( - JsonDebeziumChangeUtils.buildDistributeKeys(pkKeys, columnFields)); - tableSchema.setProperties(tableProperties); - if (tableProperties.containsKey("table-buckets")) { - String tableBucketsConfig = tableProperties.get("table-buckets"); - Map tableBuckets = - DatabaseSync.getTableBuckets(tableBucketsConfig); - Integer buckets = - JsonDebeziumChangeUtils.getTableSchemaBuckets( - tableBuckets, tableSchema.getTable()); - tableSchema.setTableBuckets(buckets); - } - return tableSchema; } else { LOG.warn( "Unsupported statement type. ddl={}, sourceConnector={}, dorisTable={}", diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java index 0f1769141..571bacfb4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; -import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -31,17 +30,7 @@ import org.apache.doris.flink.tools.cdc.postgres.PostgresType; import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.regex.Pattern; - -import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL; -import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE; -import static org.apache.doris.flink.tools.cdc.SourceConnector.POSTGRES; -import static org.apache.doris.flink.tools.cdc.SourceConnector.SQLSERVER; public class JsonDebeziumChangeUtils { @@ -101,35 +90,4 @@ public static String buildDorisTypeName( } return dorisTypeName; } - - public static List buildDistributeKeys( - List primaryKeys, Map fields) { - if (!CollectionUtil.isNullOrEmpty(primaryKeys)) { - return primaryKeys; - } - if (!fields.isEmpty()) { - Entry firstField = fields.entrySet().iterator().next(); - return Collections.singletonList(firstField.getKey()); - } - return new ArrayList<>(); - } - - public static Integer getTableSchemaBuckets( - Map tableBucketsMap, String tableName) { - if (tableBucketsMap != null) { - // Firstly, if the table name is in the table-buckets map, set the buckets of the table. - if (tableBucketsMap.containsKey(tableName)) { - return tableBucketsMap.get(tableName); - } - // Secondly, iterate over the map to find a corresponding regular expression match, - for (Entry entry : tableBucketsMap.entrySet()) { - - Pattern pattern = Pattern.compile(entry.getKey()); - if (pattern.matcher(tableName).matches()) { - return entry.getValue(); - } - } - } - return null; - } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index f44602c92..3f9858010 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -26,7 +26,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import org.apache.commons.collections.CollectionUtils; -import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.exception.IllegalArgumentException; @@ -37,7 +37,6 @@ import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema; import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.apache.doris.flink.sink.writer.EventType; -import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,37 +220,20 @@ public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessi JsonNode columns = tableChange.get("table").get("columns"); JsonNode comment = tableChange.get("table").get("comment"); String tblComment = comment == null ? "" : comment.asText(); - Map field = new LinkedHashMap<>(); + Map fields = new LinkedHashMap<>(); for (JsonNode column : columns) { - buildFieldSchema(field, column); + buildFieldSchema(fields, column); } List pkList = new ArrayList<>(); for (JsonNode column : pkColumns) { String fieldName = column.asText(); pkList.add(fieldName); } + String[] dbTable = dorisTable.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); - TableSchema tableSchema = new TableSchema(); - tableSchema.setFields(field); - tableSchema.setKeys(pkList); - tableSchema.setDistributeKeys(JsonDebeziumChangeUtils.buildDistributeKeys(pkList, field)); - tableSchema.setTableComment(tblComment); - tableSchema.setProperties(tableProperties); - tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE); - - String[] split = dorisTable.split("\\."); - Preconditions.checkArgument(split.length == 2); - tableSchema.setDatabase(split[0]); - tableSchema.setTable(split[1]); - if (tableProperties.containsKey("table-buckets")) { - String tableBucketsConfig = tableProperties.get("table-buckets"); - Map tableBuckets = DatabaseSync.getTableBuckets(tableBucketsConfig); - Integer buckets = - JsonDebeziumChangeUtils.getTableSchemaBuckets( - tableBuckets, tableSchema.getTable()); - tableSchema.setTableBuckets(buckets); - } - return tableSchema; + return DorisSchemaFactory.createTableSchema( + dbTable[0], dbTable[1], fields, pkList, tableProperties, tblComment); } private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java index 7e44acc6f..71d4edfe5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -90,7 +90,7 @@ public boolean schemaChange(JsonNode recordRoot) { LOG.warn("Failed to get doris table tuple. record={}", recordRoot); return false; } - List ddlList = tryParserAlterDDLs(recordRoot); + List ddlList = tryParseAlterDDLs(recordRoot); status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status); } } catch (Exception ex) { @@ -110,12 +110,12 @@ public TableSchema tryParseCreateTableStatement(JsonNode record, String dorisTab } @VisibleForTesting - public List tryParserAlterDDLs(JsonNode record) throws IOException { + public List tryParseAlterDDLs(JsonNode record) throws IOException { String dorisTable = JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping); JsonNode historyRecord = extractHistoryRecord(record); String ddl = extractJsonNode(historyRecord, "ddl"); extractSourceConnector(record); - return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl, dorisTable); + return sqlParserSchemaManager.parseAlterDDLs(sourceConnector, ddl, dorisTable); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 9c4f2ac40..4abe8c61f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisConnectionOptions; @@ -128,13 +129,7 @@ public void build() throws Exception { } List syncTables = new ArrayList<>(); List> dorisTables = new ArrayList<>(); - Map tableBucketsMap = null; - if (tableConfig.containsKey("table-buckets")) { - tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); - } - // Set of table names that have assigned bucket numbers. - Set tablesWithBucketsAssigned = new HashSet<>(); Set targetDbSet = new HashSet<>(); for (SourceSchema schema : schemaList) { syncTables.add(schema.getTableName()); @@ -153,13 +148,7 @@ public void build() throws Exception { // Calculate the mapping relationship between upstream and downstream tables tableMapping.put( schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable)); - tryCreateTableIfAbsent( - dorisSystem, - targetDb, - dorisTable, - schema, - tableBucketsMap, - tablesWithBucketsAssigned); + tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema); if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) { dorisTables.add(Tuple2.of(targetDb, dorisTable)); @@ -418,6 +407,7 @@ protected HashMap multiToOneRulesParser( * @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30 * @return The table name and buckets map. The key is table name, the value is buckets. */ + @Deprecated public static Map getTableBuckets(String tableBuckets) { Map tableBucketsMap = new LinkedHashMap<>(); String[] tableBucketsArray = tableBuckets.split(","); @@ -438,6 +428,7 @@ public static Map getTableBuckets(String tableBuckets) { * @param dorisTable the table name need to set buckets * @param tableHasSet The buckets table is set */ + @Deprecated public void setTableSchemaBuckets( Map tableBucketsMap, TableSchema dorisSchema, @@ -468,20 +459,16 @@ public void setTableSchemaBuckets( } private void tryCreateTableIfAbsent( - DorisSystem dorisSystem, - String targetDb, - String dorisTable, - SourceSchema schema, - Map tableBucketsMap, - Set tableBucketsSet) { + DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) { if (!dorisSystem.tableExists(targetDb, dorisTable)) { - TableSchema dorisSchema = schema.convertTableSchema(tableConfig); - dorisSchema.setDatabase(targetDb); - dorisSchema.setTable(dorisTable); - // set the table buckets of table - if (tableBucketsMap != null) { - setTableSchemaBuckets(tableBucketsMap, dorisSchema, dorisTable, tableBucketsSet); - } + TableSchema dorisSchema = + DorisSchemaFactory.createTableSchema( + database, + dorisTable, + schema.getFields(), + schema.getPrimaryKeys(), + tableConfig, + schema.getTableComment()); try { dorisSystem.createTable(dorisSchema); } catch (Exception ex) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index cbef59fa2..de3e7975b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -72,6 +72,7 @@ public static String getString(String databaseName, String schemaName, String ta return identifier.toString(); } + @Deprecated public TableSchema convertTableSchema(Map tableProps) { TableSchema tableSchema = new TableSchema(); tableSchema.setModel(this.model); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java new file mode 100644 index 000000000..29e8d4611 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.catalog.doris; + +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class DorisSchemaFactoryTest { + + @Test + public void testParseTableSchemaBuckets() { + Assert.assertNull(DorisSchemaFactory.parseTableSchemaBuckets(null, null)); + Map map = new HashMap<>(); + Assert.assertNull(DorisSchemaFactory.parseTableSchemaBuckets(map, null)); + map.put("tbl1", 1); + Assert.assertEquals(DorisSchemaFactory.parseTableSchemaBuckets(map, "tbl1").intValue(), 1); + map = new HashMap<>(); + map.put("tbl2.*", 1); + Assert.assertEquals(DorisSchemaFactory.parseTableSchemaBuckets(map, "tbl2").intValue(), 1); + Assert.assertEquals(DorisSchemaFactory.parseTableSchemaBuckets(map, "tbl21").intValue(), 1); + } + + @Test + public void testCreateTableSchema() { + String dorisTable = "doris.create_tab"; + String[] dbTable = dorisTable.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + + Map columnFields = new HashMap<>(); + columnFields.put("id", new FieldSchema("id", "INT", "100", "int_test")); + columnFields.put("name", new FieldSchema("name", "VARVHAR(100)", null, "Name_test")); + columnFields.put("age", new FieldSchema("age", "INT", null, null)); + columnFields.put("email", new FieldSchema("email", "VARCHAR(100)", "email@doris.com", "e")); + List pkKeys = Collections.singletonList("email"); + Map tableProperties = new HashMap<>(); + String tableComment = "auto_tab_comment"; + TableSchema tableSchema = + DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + pkKeys, + tableProperties, + tableComment); + Assert.assertEquals( + "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={}, tableBuckets=null}", + tableSchema.toString()); + } + + @Test + public void testCreateTableSchemaTableBuckets() { + String dorisTable = "doris.create_tab"; + String[] dbTable = dorisTable.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + + Map columnFields = new HashMap<>(); + columnFields.put("id", new FieldSchema("id", "INT", "100", "int_test")); + columnFields.put("name", new FieldSchema("name", "VARVHAR(100)", null, "Name_test")); + columnFields.put("age", new FieldSchema("age", "INT", null, null)); + columnFields.put("email", new FieldSchema("email", "VARCHAR(100)", "email@doris.com", "e")); + List pkKeys = Collections.singletonList("email"); + Map tableProperties = new HashMap<>(); + tableProperties.put("table-buckets", "create_tab:40, create_taba:10, tabs:12"); + String tableComment = "auto_tab_comment"; + TableSchema tableSchema = + DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + pkKeys, + tableProperties, + tableComment); + + Assert.assertEquals( + "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={table-buckets=create_tab:40, create_taba:10, tabs:12}, tableBuckets=40}", + tableSchema.toString()); + } + + @Test + public void testCreateDuplicateTableSchema() { + String dorisTable = "doris.dup_tab"; + String[] dbTable = dorisTable.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + + Map columnFields = new HashMap<>(); + columnFields.put("id", new FieldSchema("id", "INT", "100", "int_test")); + columnFields.put("name", new FieldSchema("name", "VARVHAR(100)", null, "Name_test")); + columnFields.put("age", new FieldSchema("age", "INT", null, null)); + columnFields.put("email", new FieldSchema("email", "VARCHAR(100)", "email@doris.com", "e")); + Map tableProperties = new HashMap<>(); + String tableComment = "auto_tab_comment"; + TableSchema tableSchema = + DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + new ArrayList<>(), + tableProperties, + tableComment); + Assert.assertEquals( + "TableSchema{database='doris', table='dup_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=name, model=DUPLICATE, distributeKeys=name, properties={}, tableBuckets=null}", + tableSchema.toString()); + } + + @Test + public void buildTableBucketMapTest() { + String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50"; + Map tableBucketsMap = DorisSchemaFactory.buildTableBucketMap(tableBuckets); + assertEquals(10, tableBucketsMap.get("tbl1").intValue()); + assertEquals(20, tableBucketsMap.get("tbl2").intValue()); + assertEquals(30, tableBucketsMap.get("a.*").intValue()); + assertEquals(40, tableBucketsMap.get("b.*").intValue()); + assertEquals(50, tableBucketsMap.get(".*").intValue()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index 941bca463..d101d1e8c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -55,7 +55,7 @@ public void testParserAlterDDLs() { SourceConnector mysql = SourceConnector.MYSQL; String ddl = "alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10)"; - List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + List actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, dorisTable); for (String actualDDL : actualDDLs) { Assert.assertTrue(expectDDLs.contains(actualDDL)); } @@ -70,7 +70,7 @@ public void testParserAlterDDLsAdd() { SourceConnector mysql = SourceConnector.ORACLE; String ddl = "ALTER TABLE employees ADD (phone_number VARCHAR2(20), address VARCHAR2(255));"; - List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + List actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, dorisTable); for (String actualDDL : actualDDLs) { Assert.assertTrue(expectDDLs.contains(actualDDL)); } @@ -89,7 +89,7 @@ public void testParserAlterDDLsChange() { + "CHANGE COLUMN old_phone_number new_phone_number VARCHAR(20) NOT NULL,\n" + "CHANGE COLUMN old_address new_address VARCHAR(255) DEFAULT 'Unknown',\n" + "MODIFY COLUMN hire_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;"; - List actualDDLs = schemaManager.parserAlterDDLs(mysql, ddl, dorisTable); + List actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, dorisTable); for (String actualDDL : actualDDLs) { Assert.assertTrue(expectDDLs.contains(actualDDL)); } @@ -258,7 +258,7 @@ public void testParseCreateDuplicateTableStatement() { SourceConnector.MYSQL, ddl, dorisTable, new HashMap<>()); String expected = - "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duptab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -338,7 +338,7 @@ public void testParseOracleDuplicateTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, new HashMap<>()); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, distributeKeys=order_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java deleted file mode 100644 index c2a93893b..000000000 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeUtils.java +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.flink.sink.writer.serializer.jsondebezium; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class TestJsonDebeziumChangeUtils { - - @Test - public void testGetTableSchemaBuckets() { - Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(null, null)); - Map map = new HashMap<>(); - Assert.assertNull(JsonDebeziumChangeUtils.getTableSchemaBuckets(map, null)); - map.put("tbl1", 1); - Assert.assertEquals( - JsonDebeziumChangeUtils.getTableSchemaBuckets(map, "tbl1").intValue(), 1); - map = new HashMap<>(); - map.put("tbl2.*", 1); - Assert.assertEquals( - JsonDebeziumChangeUtils.getTableSchemaBuckets(map, "tbl2").intValue(), 1); - Assert.assertEquals( - JsonDebeziumChangeUtils.getTableSchemaBuckets(map, "tbl21").intValue(), 1); - } -} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java index 7fdf97d2d..aa1333da7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -61,7 +61,7 @@ public void testExtractDDLListMultipleColumns() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); - List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + List ddlSQLList = schemaChange.tryParseAlterDDLs(recordRoot); for (int i = 0; i < ddlSQLList.size(); i++) { String srcSQL = srcSqlList.get(i); String targetSQL = ddlSQLList.get(i); @@ -74,7 +74,7 @@ public void testExtractDDLListChangeColumn() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); - List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + List ddlSQLList = schemaChange.tryParseAlterDDLs(recordRoot); String result = "ALTER TABLE `test`.`t1` RENAME COLUMN `c555` `c777`"; Assert.assertEquals(result, ddlSQLList.get(0)); @@ -85,7 +85,7 @@ public void testExtractDDLListRenameColumn() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordRoot = objectMapper.readTree(record); - List ddlSQLList = schemaChange.tryParserAlterDDLs(recordRoot); + List ddlSQLList = schemaChange.tryParseAlterDDLs(recordRoot); Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c22` `c33`", ddlSQLList.get(0)); } @@ -94,7 +94,7 @@ public void testExtractDDlListChangeName() throws IOException { String columnInfo = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710925209991,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000288\",\"pos\":81654,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81654,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1710925209,\\\"file\\\":\\\"mysql-bin.000288\\\",\\\"pos\\\":81808,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 change age age1 int\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"name\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8\\\",\\\"length\\\":256,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"age1\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode record = objectMapper.readTree(columnInfo); - List changeNameList = schemaChange.tryParserAlterDDLs(record); + List changeNameList = schemaChange.tryParseAlterDDLs(record); Assert.assertEquals( "ALTER TABLE `test`.`t1` RENAME COLUMN `age` `age1`", changeNameList.get(0)); } @@ -104,7 +104,7 @@ public void testExtractDDlListChangeNameWithColumn() throws IOException { String columnInfo = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1711088321412,\"snapshot\":\"false\",\"db\":\"doris_test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000292\",\"pos\":55695,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55695,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1711088321,\\\"file\\\":\\\"mysql-bin.000292\\\",\\\"pos\\\":55891,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1\\\\n change column `key` key_word int default 1 not null\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":false,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"key_word\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"length\\\":11,\\\"position\\\":2,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode record = objectMapper.readTree(columnInfo); - List changeNameList = schemaChange.tryParserAlterDDLs(record); + List changeNameList = schemaChange.tryParseAlterDDLs(record); Assert.assertEquals( "ALTER TABLE `test`.`t1` RENAME COLUMN `key` `key_word`", changeNameList.get(0)); } @@ -114,7 +114,7 @@ public void testAddDatetimeColumn() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720596740767,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":10192,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10192,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720596740,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":10405,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink34 add column `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'datatime_test'\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordJsonNode = objectMapper.readTree(record); - List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + List changeNameList = schemaChange.tryParseAlterDDLs(recordJsonNode); Assert.assertEquals( "ALTER TABLE `test`.`t1` ADD COLUMN `create_time` DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP COMMENT 'datatime_test'", changeNameList.get(0)); @@ -125,7 +125,7 @@ public void testDropColumn() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720599133910,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":12084,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12084,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720599133,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12219,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 drop column create_time\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordJsonNode = objectMapper.readTree(record); - List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + List changeNameList = schemaChange.tryParseAlterDDLs(recordJsonNode); Assert.assertEquals( "ALTER TABLE `test`.`t1` DROP COLUMN `create_time`", changeNameList.get(0)); } @@ -135,7 +135,7 @@ public void testChangeColumn() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1720598926291,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink34\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000065\",\"pos\":11804,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":11804,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1720598926,\\\"file\\\":\\\"binlog.000065\\\",\\\"pos\\\":12007,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"ALTER TABLE test_sink34 CHANGE COLUMN `create_time2` `create_time` datetime(6) DEFAULT CURRENT_TIMESTAMP(6)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink34\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":50,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"decimal_type\\\",\\\"jdbcType\\\":3,\\\"typeName\\\":\\\"DECIMAL\\\",\\\"typeExpression\\\":\\\"DECIMAL\\\",\\\"charsetName\\\":null,\\\"length\\\":38,\\\"scale\\\":9,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"0.123456789\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"create_time\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"length\\\":6,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"1970-01-01 00:00:00\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; JsonNode recordJsonNode = objectMapper.readTree(record); - List changeNameList = schemaChange.tryParserAlterDDLs(recordJsonNode); + List changeNameList = schemaChange.tryParseAlterDDLs(recordJsonNode); Assert.assertEquals( "ALTER TABLE `test`.`t1` RENAME COLUMN `create_time2` `create_time`", changeNameList.get(0)); @@ -175,7 +175,7 @@ public void testAutoCreateDuplicateTable() throws IOException { schemaChange.tryParseCreateTableStatement( recordJsonNode, "doris.auto_duplicate_tab"); String expected = - "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_duplicate_tab', tableComment='null', fields={id=FieldSchema{name='id', typeString='INT', defaultValue='null', comment='null'}, name=FieldSchema{name='name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distributeKeys=id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } }