diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index dea4422c9..701bd9672 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -463,6 +463,11 @@ public void setTableSchemaBuckets( private void tryCreateTableIfAbsent( DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) { if (!dorisSystem.tableExists(targetDb, dorisTable)) { + if (dorisTableConfig.isConvertUniqToPk() + && CollectionUtil.isNullOrEmpty(schema.primaryKeys) + && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) { + schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs); + } TableSchema dorisSchema = DorisSchemaFactory.createTableSchema( database, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java index 6f5d929e9..6014f249d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java @@ -32,6 +32,7 @@ public class DorisTableConfig implements Serializable { public static final String REPLICATION_NUM = "replication_num"; public static final String TABLE_BUCKETS = "table-buckets"; public static final String TABLE_PARTITIONS = "table-partitions"; + public static final String CONVERT_UNIQ_TO_PK = "convert-uniq-to-pk"; private final Map tableProperties; // The specific parameters extracted from --table-conf need to be parsed and integrated into the @@ -39,6 +40,8 @@ public class DorisTableConfig implements Serializable { private Map tableBuckets; // table:partitionColumn:interval private Map> tablePartitions; + // uniq index to primary key + private boolean convertUniqToPk = false; // Only for testing @VisibleForTesting @@ -64,6 +67,11 @@ public DorisTableConfig(Map tableConfig) { tableConfig.remove(TABLE_PARTITIONS); } + if (tableConfig.containsKey(CONVERT_UNIQ_TO_PK)) { + this.convertUniqToPk = Boolean.parseBoolean(tableConfig.get(CONVERT_UNIQ_TO_PK)); + tableConfig.remove(CONVERT_UNIQ_TO_PK); + } + tableProperties = tableConfig; } @@ -79,6 +87,10 @@ public Map> getTablePartitions() { return tablePartitions; } + public boolean isConvertUniqToPk() { + return convertUniqToPk; + } + /** * Build table bucket Map. * diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java index 2547b976e..aa64037b8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -49,10 +49,7 @@ public JdbcSourceSchema( super(databaseName, schemaName, tableName, tableComment); fields = getColumnInfo(metaData, databaseName, schemaName, tableName); primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName); - if (primaryKeys.isEmpty()) { - List uniqIndex = getUniqIndex(metaData, databaseName, schemaName, tableName); - primaryKeys.addAll(uniqIndex); - } + uniqueIndexs = getUniqIndex(metaData, databaseName, schemaName, tableName); } public LinkedHashMap getColumnInfo( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index de3e7975b..aed1754c0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -37,6 +37,7 @@ public abstract class SourceSchema { protected final String tableComment; protected LinkedHashMap fields; public List primaryKeys; + public List uniqueIndexs; public DataModel model = DataModel.UNIQUE; public SourceSchema( @@ -64,7 +65,6 @@ public static String getString(String databaseName, String schemaName, String ta if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) { identifier.add(schemaName); } - if (!StringUtils.isNullOrWhitespaceOnly(tableName)) { identifier.add(tableName); } @@ -115,6 +115,10 @@ public List getPrimaryKeys() { return primaryKeys; } + public List getUniqueIndexs() { + return uniqueIndexs; + } + public String getTableComment() { return tableComment; }