diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java index 9693d433e..dd42f803b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java @@ -23,6 +23,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DorisTableConfig; @@ -63,6 +64,10 @@ public static TableSchema createTableSchema( tableSchema.setProperties(dorisTableConfig.getTableProperties()); tableSchema.setTableBuckets( parseTableSchemaBuckets(dorisTableConfig.getTableBuckets(), table)); + if (ObjectUtils.isNotEmpty(dorisTableConfig.getTablePartitions()) + && dorisTableConfig.getTablePartitions().containsKey(table)) { + tableSchema.setPartitionInfo(dorisTableConfig.getTablePartitions().get(table)); + } } return tableSchema; } @@ -123,16 +128,29 @@ public static String generateCreateTableDDL(TableSchema schema) { throw new CreateTableException("key " + key + " not found in column list"); } FieldSchema field = fields.get(key); - buildColumn(sb, field, true); + buildColumn(sb, field, true, false); + } + + // append partition column, auto partition column must be in keys + if (schema.getPartitionInfo() != null) { + String partitionCol = schema.getPartitionInfo().f0; + FieldSchema field = fields.get(partitionCol); + buildColumn(sb, field, true, true); } // append values for (Map.Entry entry : fields.entrySet()) { + // skip key column if (keys.contains(entry.getKey())) { continue; } + // skip partition column + if (schema.getPartitionInfo() != null + && entry.getKey().equals(schema.getPartitionInfo().f0)) { + continue; + } FieldSchema field = entry.getValue(); - buildColumn(sb, field, false); + buildColumn(sb, field, false, false); } sb = sb.deleteCharAt(sb.length() - 1); sb.append(" ) "); @@ -140,8 +158,13 @@ public static String generateCreateTableDDL(TableSchema schema) { if (DataModel.UNIQUE.equals(schema.getModel())) { sb.append(schema.getModel().name()) .append(" KEY(") - .append(String.join(",", identifier(schema.getKeys()))) - .append(")"); + .append(String.join(",", identifier(schema.getKeys()))); + + if (schema.getPartitionInfo() != null) { + sb.append(",").append(identifier(schema.getPartitionInfo().f0)); + } + + sb.append(")"); } // append table comment @@ -149,6 +172,16 @@ public static String generateCreateTableDDL(TableSchema schema) { sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' "); } + // append partition info if exists + if (schema.getPartitionInfo() != null) { + sb.append(" AUTO PARTITION BY RANGE ") + .append( + String.format( + "(date_trunc(`%s`, '%s'))", + schema.getPartitionInfo().f0, schema.getPartitionInfo().f1)) + .append("()"); + } + // append distribute key sb.append(" DISTRIBUTED BY HASH(") .append(String.join(",", identifier(schema.getDistributeKeys()))) @@ -165,6 +198,7 @@ public static String generateCreateTableDDL(TableSchema schema) { } else { sb.append(" BUCKETS AUTO "); } + // append properties int index = 0; for (Map.Entry entry : properties.entrySet()) { @@ -186,13 +220,19 @@ public static String generateCreateTableDDL(TableSchema schema) { return sb.toString(); } - private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { + private static void buildColumn( + StringBuilder sql, FieldSchema field, boolean isKey, boolean autoPartitionCol) { String fieldType = field.getTypeString(); if (isKey && DorisType.STRING.equals(fieldType)) { fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); } sql.append(identifier(field.getName())).append(" ").append(fieldType); + // auto partition need set partition-column not null + if (autoPartitionCol) { + sql.append(" NOT NULL "); + } + if (field.getDefaultValue() != null) { sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue())); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index 3a47a044e..f7617ab19 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.catalog.doris; +import org.apache.flink.api.java.tuple.Tuple2; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -32,9 +34,11 @@ public class TableSchema { private DataModel model = DataModel.DUPLICATE; private List distributeKeys = new ArrayList<>(); private Map properties = new HashMap<>(); - private Integer tableBuckets; + // Currently only supports auto partition, eg: DATE_TRUNC(column,interval) + private Tuple2 partitionInfo; + public String getDatabase() { return database; } @@ -107,6 +111,14 @@ public Integer getTableBuckets() { return tableBuckets; } + public Tuple2 getPartitionInfo() { + return partitionInfo; + } + + public void setPartitionInfo(Tuple2 partitionInfo) { + this.partitionInfo = partitionInfo; + } + @Override public String toString() { return "TableSchema{" diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 61beea194..e512d790a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -172,7 +172,12 @@ private static void syncDatabase( .setIgnoreIncompatible(ignoreIncompatible) .setSchemaChangeMode(schemaChangeMode) .create(); - databaseSync.build(); + + boolean needExecute = databaseSync.build(); + if (!needExecute) { + // create table only + return; + } if (StringUtils.isNullOrWhitespaceOnly(jobName)) { jobName = String.format( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 7cd29506e..dea4422c9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -113,7 +113,7 @@ public void create() { this.converter = new TableNameConverter(tablePrefix, tableSuffix, multiToOneRulesPattern); } - public void build() throws Exception { + public boolean build() throws Exception { DorisConnectionOptions options = getDorisConnectionOptions(); DorisSystem dorisSystem = new DorisSystem(options); @@ -156,7 +156,7 @@ public void build() throws Exception { } if (createTableOnly) { System.out.println("Create table finished."); - System.exit(0); + return false; } LOG.info("table mapping: {}", tableMapping); config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); @@ -181,6 +181,7 @@ public void build() throws Exception { .uid(uidName); } } + return true; } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java index 6318fc8a5..6f5d929e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.tools.cdc; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; import java.io.Serializable; import java.util.HashMap; @@ -30,11 +31,14 @@ public class DorisTableConfig implements Serializable { // PROPERTIES parameter in doris table creation statement. such as: replication_num=1. public static final String REPLICATION_NUM = "replication_num"; public static final String TABLE_BUCKETS = "table-buckets"; + public static final String TABLE_PARTITIONS = "table-partitions"; private final Map tableProperties; // The specific parameters extracted from --table-conf need to be parsed and integrated into the // doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50". private Map tableBuckets; + // table:partitionColumn:interval + private Map> tablePartitions; // Only for testing @VisibleForTesting @@ -55,6 +59,11 @@ public DorisTableConfig(Map tableConfig) { this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS)); tableConfig.remove(TABLE_BUCKETS); } + if (tableConfig.containsKey(TABLE_PARTITIONS)) { + this.tablePartitions = buildTablePartitionMap(tableConfig.get(TABLE_PARTITIONS)); + tableConfig.remove(TABLE_PARTITIONS); + } + tableProperties = tableConfig; } @@ -66,6 +75,10 @@ public Map getTableProperties() { return tableProperties; } + public Map> getTablePartitions() { + return tablePartitions; + } + /** * Build table bucket Map. * @@ -83,4 +96,23 @@ public Map buildTableBucketMap(String tableBuckets) { } return tableBucketsMap; } + + /** + * Build table partition Map. + * + * @param tablePartitions the string of tablePartitions, + * eg:tbl1:dt_column:month,tb2:dt_column:day + * @return The table name and buckets map. The key is table name, the value is partition column + * and interval. + */ + @VisibleForTesting + public Map> buildTablePartitionMap(String tablePartitions) { + Map> tablePartitionMap = new LinkedHashMap<>(); + String[] tablePartitionArray = tablePartitions.split(","); + for (String tablePartition : tablePartitionArray) { + String[] tp = tablePartition.split(":"); + tablePartitionMap.put(tp[0].trim(), Tuple2.of(tp[1].trim(), tp[2].trim())); + } + return tablePartitionMap; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java index ec536ee68..cd3965a30 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -95,6 +96,9 @@ protected void submitE2EJob(String jobName, String[] args) { LOG.info("{} e2e job will submit to start. ", jobName); CdcTools.setStreamExecutionEnvironmentForTesting(configFlinkEnvironment()); CdcTools.main(args); + if (Arrays.asList(args).contains("--create-table-only")) { + return; + } jobClient = CdcTools.getJobClient(); if (Objects.isNull(jobClient)) { LOG.warn("Failed get flink job client. jobName={}", jobName); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java index 938aa2184..fe715f62d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java @@ -21,11 +21,14 @@ import org.apache.doris.flink.container.ContainerUtils; import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.ResultSet; +import java.sql.Statement; import java.util.Arrays; import java.util.List; @@ -381,6 +384,58 @@ public void testMySQL2DorisEnableDelete() throws Exception { cancelE2EJob(jobName); } + @Test + public void testMySQL2DorisCreateTableOnly() throws Exception { + String jobName = "testMySQL2DorisCreateTableOnly"; + initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql"); + startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt"); + + String createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_uniq"); + Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS 10")); + + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_dup"); + Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`, `name`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO")); + + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_uniqindex"); + Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`name`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS 30")); + + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_uniqindex2"); + Assert.assertTrue( + createTblSQL.contains("UNIQUE KEY(`id`, `name`)") + || createTblSQL.contains("UNIQUE KEY(`id`, `age`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS 30")); + + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_from_multiindex"); + Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO")); + + /* + The auto partition behavior of doris 2.1.0 to 2.1.4 has changed, temporarily skipped + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_uniq"); + Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`, `create_dtime`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO")); + + createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_dup"); + Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`, `create_dtime`, `name`)")); + Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO")); + */ + } + + private String getCreateTableSQL(String database, String table) throws Exception { + Statement statement = getDorisQueryConnection().createStatement(); + ResultSet resultSet = + statement.executeQuery(String.format("SHOW CREATE TABLE %s.%s", database, table)); + while (resultSet.next()) { + String createTblSql = resultSet.getString(2); + LOG.info("Create table sql: {}", createTblSql.replace("\n", "")); + return createTblSql; + } + throw new RuntimeException("Table not exist " + table); + } + @After public void close() { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java index 1a82e1c6e..804e5dbd4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.api.java.tuple.Tuple2; + import org.junit.Before; import org.junit.Test; @@ -43,4 +45,14 @@ public void buildTableBucketMapTest() { assertEquals(40, tableBucketsMap.get("b.*").intValue()); assertEquals(50, tableBucketsMap.get(".*").intValue()); } + + @Test + public void buildTablePartitionMapTest() { + String tablePartitions = "tbl1:dt_col_d:day,tbl2:dt_col_w:week,tbl3:dt_col_m:month"; + Map> tablePartitionMap = + dorisTableConfig.buildTablePartitionMap(tablePartitions); + assertEquals(Tuple2.of("dt_col_d", "day"), tablePartitionMap.get("tbl1")); + assertEquals(Tuple2.of("dt_col_w", "week"), tablePartitionMap.get("tbl2")); + assertEquals(Tuple2.of("dt_col_m", "month"), tablePartitionMap.get("tbl3")); + } } diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt new file mode 100644 index 000000000..632c37356 --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt @@ -0,0 +1,5 @@ +mysql-sync-database + --including-tables "create_tbl_.*" + --create-table-only + --table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30 + --table-conf replication_num=1 \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql new file mode 100644 index 000000000..cc3c16a6f --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql @@ -0,0 +1,59 @@ +CREATE DATABASE if NOT EXISTS test_e2e_mysql; +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq; +CREATE TABLE test_e2e_mysql.create_tbl_uniq ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); + +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_dup; +CREATE TABLE test_e2e_mysql.create_tbl_dup ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL +); + +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_uniqindex; +CREATE TABLE test_e2e_mysql.create_tbl_from_uniqindex ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +UNIQUE KEY `uniq` (`name`) +); + +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_uniqindex2; +CREATE TABLE test_e2e_mysql.create_tbl_from_uniqindex2 ( +`id` int DEFAULT NULL, +`name` varchar(255) DEFAULT NULL, +`age` int DEFAULT NULL, +UNIQUE KEY `idname_uniq` (`id`,`name`), +UNIQUE KEY `idage_uniq` (`id`,`age`) +); + +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_multiindex; +CREATE TABLE test_e2e_mysql.create_tbl_from_multiindex ( +`id` int DEFAULT NULL, +`name` varchar(255) DEFAULT NULL, +`age` int DEFAULT NULL, +UNIQUE KEY `uniq` (`id`), +KEY `normal` (`name`) +); + +-- for auto partition table +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_part_uniq; +CREATE TABLE test_e2e_mysql.create_tbl_part_uniq ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` int DEFAULT NULL, +`create_dtime` datetime DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); + +DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_part_dup; +CREATE TABLE test_e2e_mysql.create_tbl_part_dup ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` int DEFAULT NULL, +`create_dtime` datetime DEFAULT NULL +); \ No newline at end of file