From 0eb3aa7075befc7478e014bbadd754af17deaf75 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 30 Nov 2023 17:53:00 +0800 Subject: [PATCH] [Feature](CDC) Add auto create table (#248) --- .../doris/flink/sink/writer/EventType.java | 23 +++ .../JsonDebeziumSchemaSerializer.java | 147 ++++++++++++++---- .../doris/flink/table/DorisConfigOptions.java | 2 +- .../doris/flink/tools/cdc/CdcTools.java | 1 + .../doris/flink/tools/cdc/DatabaseSync.java | 27 +++- .../tools/cdc/mysql/MysqlDatabaseSync.java | 10 +- .../tools/cdc/oracle/OracleDatabaseSync.java | 10 +- .../cdc/postgres/PostgresDatabaseSync.java | 10 +- .../cdc/sqlserver/SqlServerDatabaseSync.java | 10 +- .../TestJsonDebeziumSchemaSerializer.java | 31 +++- .../flink/tools/cdc/DatabaseSyncTest.java | 16 ++ 11 files changed, 244 insertions(+), 43 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java new file mode 100644 index 000000000..d26bf278d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +public enum EventType { + ALTER, + CREATE +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 0c4309b98..29d9ceec1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -26,12 +26,14 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.schema.SchemaChangeHelper; import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema; import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.EventType; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; @@ -41,6 +43,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +51,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -67,7 +71,6 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer tuple = getDorisTableTuple(recordRoot); - if(tuple == null){ + EventType eventType = extractEventType(recordRoot); + if(eventType == null){ return false; } - - List ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); - return false; - } - - List ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + if(eventType.equals(EventType.CREATE)){ + TableSchema tableSchema = extractCreateTableSchema(recordRoot); + status = schemaChangeManager.createTable(tableSchema); + if(status){ + String cdcTbl = getCdcTableIdentifier(recordRoot); + String dorisTbl = getCreateTableIdentifier(recordRoot); + tableMapping.put(cdcTbl, dorisTbl); + LOG.info("create table ddl status: {}", status); + } + } else if (eventType.equals(EventType.ALTER)){ + // db,table + Tuple2 tuple = getDorisTableTuple(recordRoot); + if(tuple == null){ + return false; + } + List ddlSqlList = extractDDLList(recordRoot); + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + List ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); + status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + } + } else{ + LOG.info("Unsupported event type {}", eventType); } } catch (Exception ex) { LOG.warn("schema change error :", ex); @@ -240,18 +258,26 @@ public boolean schemaChangeV2(JsonNode recordRoot) { return status; } + protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + JsonNode tableChanges = historyRecord.get("tableChanges"); + if(!Objects.isNull(tableChanges)){ + JsonNode tableChange = tableChanges.get(0); + return tableChange; + } + return null; + } + + /** + * Parse Alter Event + */ @VisibleForTesting - public List extractDDLList(JsonNode record) throws JsonProcessingException { + public List extractDDLList(JsonNode record) throws IOException{ String dorisTable = getDorisTableIdentifier(record); JsonNode historyRecord = extractHistoryRecord(record); - JsonNode tableChanges = historyRecord.get("tableChanges"); String ddl = extractJsonNode(historyRecord, "ddl"); - if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) { - return new ArrayList<>(); - } - LOG.debug("received debezium ddl :{}", ddl); - JsonNode tableChange = tableChanges.get(0); - if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { + JsonNode tableChange = extractTableChange(record); + if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { return null; } @@ -285,6 +311,47 @@ public List extractDDLList(JsonNode record) throws JsonProcessingExcepti return SchemaChangeHelper.generateDDLSql(dorisTable); } + @VisibleForTesting + public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException { + String dorisTable = getCreateTableIdentifier(record); + JsonNode tableChange = extractTableChange(record); + JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames"); + JsonNode columns = tableChange.get("table").get("columns"); + String tblComment = tableChange.get("table").get("comment").asText(); + Map field = new LinkedHashMap<>(); + for (JsonNode column : columns) { + buildFieldSchema(field, column); + } + List pkList = new ArrayList<>(); + for(JsonNode column : pkColumns){ + String fieldName = column.asText(); + pkList.add(fieldName); + } + + TableSchema tableSchema = new TableSchema(); + tableSchema.setFields(field); + tableSchema.setKeys(pkList); + tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field)); + tableSchema.setTableComment(tblComment); + + String[] split = dorisTable.split("\\."); + Preconditions.checkArgument(split.length == 2); + tableSchema.setDatabase(split[0]); + tableSchema.setTable(split[1]); + return tableSchema; + } + + private List buildDistributeKeys(List primaryKeys, Map fields) { + if (!CollectionUtil.isNullOrEmpty(primaryKeys)) { + return primaryKeys; + } + if(!fields.isEmpty()){ + Map.Entry firstField = fields.entrySet().iterator().next(); + return Collections.singletonList(firstField.getKey()); + } + return new ArrayList<>(); + } + @VisibleForTesting public void setOriginFieldSchemaMap(Map originFieldSchemaMap) { this.originFieldSchemaMap = originFieldSchemaMap; @@ -334,6 +401,12 @@ public String getCdcTableIdentifier(JsonNode record){ return SourceSchema.getString(db, schema, table); } + public String getCreateTableIdentifier(JsonNode record){ + String db = extractJsonNode(record.get("source"), "db"); + String table = extractJsonNode(record.get("source"), "table"); + return db + "." + table; + } + public String getDorisTableIdentifier(String cdcTableIdentifier){ if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ return dorisOptions.getTableIdentifier(); @@ -405,6 +478,23 @@ protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } + /** + * Parse event type + */ + protected EventType extractEventType(JsonNode record) throws JsonProcessingException { + JsonNode tableChange = extractTableChange(record); + if(tableChange == null || tableChange.get("type") == null){ + return null; + } + String type = tableChange.get("type").asText(); + if(EventType.ALTER.toString().equalsIgnoreCase(type)){ + return EventType.ALTER; + }else if(EventType.CREATE.toString().equalsIgnoreCase(type)){ + return EventType.CREATE; + } + return null; + } + private String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode) ? record.get(key).asText() : null; @@ -425,7 +515,7 @@ private Map extractRow(JsonNode recordRow) { } private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException { - if (record.has("historyRecord")) { + if (record != null && record.has("historyRecord")) { return objectMapper.readTree(record.get("historyRecord").asText()); } // The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction @@ -452,8 +542,6 @@ public String extractDDL(JsonNode record) throws JsonProcessingException { return null; } - - @VisibleForTesting public void fillOriginSchema(JsonNode columns) { if (Objects.nonNull(originFieldSchemaMap)) { @@ -623,5 +711,4 @@ private String handleType(String type) { return type; } - } \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index af01ea515..546dc6eb9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -89,7 +89,7 @@ public class DorisConfigOptions { .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) .withDescription(""); public static final ConfigOption DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") + .key("doris.deserialize.queue.size") .intType() .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) .withDescription(""); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index d05fa15c5..fe35ee3c5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -129,6 +129,7 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data .setTableConfig(tableMap) .setCreateTableOnly(createTableOnly) .setNewSchemaChange(useNewSchemaChange) + .setSingleSink(singleSink) .create(); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index bf262146f..02ab034fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); @@ -83,6 +84,11 @@ public abstract class DatabaseSync { public abstract DataStreamSource buildCdcSource(StreamExecutionEnvironment env); + /** + * Get the prefix of a specific tableList, for example, mysql is database, oracle is schema + */ + public abstract String getTableListPrefix(); + public DatabaseSync() throws SQLException { registerDriver(); } @@ -132,8 +138,7 @@ public void build() throws Exception { System.out.println("Create table finished."); System.exit(0); } - - config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")"); + config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); DataStreamSource streamSource = buildCdcSource(env); if(singleSink){ streamSource.sinkTo(buildDorisSink()); @@ -256,6 +261,24 @@ protected boolean isSyncNeeded(String tableName) { LOG.debug("table {} is synchronized? {}", tableName, sync); return sync; } + + protected String getSyncTableList(List syncTables){ + if(!singleSink){ + return syncTables.stream() + .map(v-> getTableListPrefix() + "\\." + v) + .collect(Collectors.joining("|")); + }else{ + // includingTablePattern and ^excludingPattern + String includingPattern = String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables); + if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) { + return includingPattern; + }else{ + String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables); + return String.format("(%s)(%s)", includingPattern, excludingPattern); + } + } + } + /** * Filter table that many tables merge to one */ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 22e49aaa7..a3e01d327 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -122,7 +122,9 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .username(config.get(MySqlSourceOptions.USERNAME)) .password(config.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(databaseName + "." + tableName); + .tableList(tableName) + //default open add newly table + .scanNewlyAddedTableEnabled(true); config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); config @@ -215,6 +217,12 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); } + @Override + public String getTableListPrefix() { + String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + return databaseName; + } + /** * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2 * @param sourceBuilder diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 6e27eb6c8..92c1f95c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -174,7 +174,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .port(port) .databaseList(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .includeSchemaChanges(true) @@ -199,7 +199,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .password(password) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .debeziumProperties(debeziumProperties) .startupOptions(startupOptions) .deserializer(schema) @@ -207,4 +207,10 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { return env.addSource(oracleSource, "Oracle Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index b8c9ad1d6..04b41270d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -159,7 +159,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .port(port) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .deserializer(schema) @@ -185,7 +185,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .port(port) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .debeziumProperties(debeziumProperties) @@ -196,4 +196,10 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { return env.addSource(postgresSource, "Postgres Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index fb25212ec..10db3c118 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -153,7 +153,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .hostname(hostname) .port(port) .databaseList(databaseName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .startupOptions(startupOptions) @@ -175,7 +175,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .hostname(hostname) .port(port) .database(databaseName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .debeziumProperties(debeziumProperties) @@ -185,4 +185,10 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { return env.addSource(sqlServerSource, "SqlServer Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index de549ef6d..32aedabfb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -20,8 +20,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -29,9 +30,8 @@ import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; - -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.BeforeClass; @@ -198,8 +198,10 @@ public void testExtractDDLListCreateTable() throws IOException { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t\\\"c100\\\" LONG, \\n\\t\\\"c55\\\" VARCHAR2(255), \\n\\t\\\"c77\\\" VARCHAR2(255), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"HELOWIN\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"ID\"],\"columns\":[{\"name\":\"ID\",\"jdbcType\":2,\"nativeType\":null,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":10,\"scale\":0,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"NAME4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"age4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c100\",\"jdbcType\":-1,\"nativeType\":null,\"typeName\":\"LONG\",\"typeExpression\":\"LONG\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c55\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c77\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}],\"comment\":null}}]}"; JsonNode recordRoot = objectMapper.readTree(record); + serializer.setSourceConnector("oracle"); List ddlSQLList = serializer.extractDDLList(recordRoot); Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + serializer.setSourceConnector("mysql"); } @Test @@ -419,4 +421,27 @@ public void testSchemaChangeMultiTable() throws Exception { dorisOptions.setTableIdentifier(tmp); } + @Test + @Ignore + public void testAutoCreateTable() throws Exception { + String record + = "{ \"source\":{ \"version\":\"1.9.7.Final\", \"connector\":\"oracle\", \"name\":\"oracle_logminer\", \"ts_ms\":1696945825065, \"snapshot\":\"true\", \"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\", \"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\", \"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null, \"ssn\":0, \"redo_thread\":null }, \"databaseName\":\"TESTDB\", \"schemaName\":\"ADMIN\", \"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \", \"tableChanges\":[ { \"type\":\"CREATE\", \"id\":\"\\\"TESTDB\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\", \"table\":{ \"defaultCharsetName\":null, \"primaryKeyColumnNames\":[ \"ID\" ], \"columns\":[ { \"name\":\"ID\", \"jdbcType\":2, \"nativeType\":null, \"typeName\":\"NUMBER\", \"typeExpression\":\"NUMBER\", \"charsetName\":null, \"length\":10, \"scale\":0, \"position\":1, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"NAME4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":2, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"age4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":3, \"optional\":true, \"autoIncremented\":false, \"generated\":false, \"comment\":null } ], \"comment\":null } } ]}"; + JsonNode recordRoot = objectMapper.readTree(record); + dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030") + .setTableIdentifier("") + .setUsername("root") + .setPassword("").build(); + serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build(); + serializer.setSourceConnector(SourceConnector.ORACLE.connectorName); + TableSchema tableSchema = serializer.extractCreateTableSchema(recordRoot); + Assert.assertEquals("TESTDB", tableSchema.getDatabase()); + Assert.assertEquals("PERSONS", tableSchema.getTable()); + Assert.assertArrayEquals(new String[]{"ID"}, tableSchema.getKeys().toArray()); + Assert.assertEquals(3, tableSchema.getFields().size()); + Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName()); + Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName()); + Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName()); + serializer.setSourceConnector(SourceConnector.MYSQL.connectorName); + } + } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index daab90b9e..fb2c8d6c0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -17,7 +17,10 @@ package org.apache.doris.flink.tools.cdc; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.flink.configuration.Configuration; +import org.junit.Assert; import org.junit.Test; + import java.util.Arrays; /** @@ -37,4 +40,17 @@ public void multiToOneRulesParserTest() throws Exception{ databaseSync.multiToOneRulesParser(arr[0], arr[1]); }); } + + @Test + public void getSyncTableListTest() throws Exception{ + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.setSingleSink(false); + databaseSync.setIncludingTables("tbl_1|tbl_2"); + Configuration config = new Configuration(); + config.setString("database-name", "db"); + config.setString("table-name", "tbl.*"); + databaseSync.setConfig(config); + String syncTableList = databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2")); + Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList); + } }