diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index f3da96289..4cc9098fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -23,6 +23,7 @@ import java.util.Map; public class TableSchema { + public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$"; private String database; private String table; private String tableComment; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index ef2e7ac84..8ca66e4d8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -32,6 +32,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -118,6 +120,15 @@ public List getSchemaList() throws Exception { if (!isSyncNeeded(tableName)) { continue; } + // Oracle allows table names to contain special characters such as /, #, $, + // etc., as in 'A/B'. + // However, Doris does not support tables with these characters. + if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) { + throw new CreateTableException( + String.format( + "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.", + tableName, TableSchema.DORIS_TABLE_REGEX)); + } SourceSchema sourceSchema = new OracleSchema( metaData, databaseName, schemaName, tableName, tableComment); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index da96f0804..f4d6ba3f1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -125,7 +125,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required"); - Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required"); + Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is required"); String tableName = config.get(JdbcSourceOptions.TABLE_NAME); String hostname = config.get(JdbcSourceOptions.HOSTNAME);