Skip to content

Commit

Permalink
[fix](cdc) add Oracle table name validation (apache#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Feb 26, 2024
1 parent 2e123bf commit 54c94a9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

public class TableSchema {
public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
private String database;
private String table;
private String tableComment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -118,6 +120,15 @@ public List<SourceSchema> getSchemaList() throws Exception {
if (!isSyncNeeded(tableName)) {
continue;
}
// Oracle allows table names to contain special characters such as /, #, $,
// etc., as in 'A/B'.
// However, Doris does not support tables with these characters.
if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
throw new CreateTableException(
String.format(
"The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.",
tableName, TableSchema.DORIS_TABLE_REGEX));
}
SourceSchema sourceSchema =
new OracleSchema(
metaData, databaseName, schemaName, tableName, tableComment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required");
Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required");
Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is required");

String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
String hostname = config.get(JdbcSourceOptions.HOSTNAME);
Expand Down

0 comments on commit 54c94a9

Please sign in to comment.