From 989faca000fc2041478e371b9973f0acb22cad40 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 22 Nov 2024 15:18:29 +0800 Subject: [PATCH] use uniq index if primary key empty --- .../flink/tools/cdc/JdbcSourceSchema.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java index 31cfd1cbf..2547b976e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -27,8 +27,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related @@ -47,6 +49,10 @@ public JdbcSourceSchema( super(databaseName, schemaName, tableName, tableComment); fields = getColumnInfo(metaData, databaseName, schemaName, tableName); primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName); + if (primaryKeys.isEmpty()) { + List uniqIndex = getUniqIndex(metaData, databaseName, schemaName, tableName); + primaryKeys.addAll(uniqIndex); + } } public LinkedHashMap getColumnInfo( @@ -96,5 +102,32 @@ public List getPrimaryKeys( return primaryKeys; } + /** + * Get the unique index of the table If the primary key is empty but there is a uniq key, then + * use the uniqkey instead of the primarykey + */ + public List getUniqIndex( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + Map> uniqIndexMap = new HashMap<>(); + String firstIndexName = null; + try (ResultSet rs = + metaData.getIndexInfo(databaseName, schemaName, tableName, true, true)) { + while (rs.next()) { + String columnName = rs.getString("COLUMN_NAME"); + String indexName = rs.getString("INDEX_NAME"); + if (firstIndexName == null) { + firstIndexName = indexName; + } + uniqIndexMap.computeIfAbsent(indexName, k -> new ArrayList<>()).add(columnName); + } + } + if (!uniqIndexMap.isEmpty()) { + // If there are multiple uniq indices, return one + return uniqIndexMap.get(firstIndexName); + } + return new ArrayList<>(); + } + public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); }