diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java index 9b930ff4c..94cb35c7e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java @@ -104,9 +104,9 @@ private void appendGet(Get get) { public void close() throws IOException { if (started.compareAndSet(true, false)) { LOG.info("close executorService"); - actionWatcherExecutorService.shutdownNow(); + actionWatcherExecutorService.shutdown(); + workerExecutorService.shutdown(); workerStated.set(false); - workerExecutorService.shutdownNow(); this.actionWatcherExecutorService = null; this.workerExecutorService = null; this.semaphore = null; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 455000e45..e36590cf8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -61,6 +61,8 @@ public abstract class DatabaseSync { public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; + protected String includingTables; + protected String excludingTables; public abstract Connection getConnection() throws SQLException; @@ -76,6 +78,8 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio this.config = config; this.database = database; this.converter = new TableNameConverter(tablePrefix, tableSuffix); + this.includingTables = includingTables; + this.excludingTables = excludingTables; this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); this.ignoreDefaultValue = ignoreDefaultValue; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 3b4bc312c..bfd974f33 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -107,11 +107,19 @@ public List getSchemaList() throws Exception { @Override public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { + Properties debeziumProperties = new Properties(); String databaseName = config.get(OracleSourceOptions.DATABASE_NAME); String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); Preconditions.checkNotNull(databaseName, "database-name in oracle is required"); Preconditions.checkNotNull(schemaName, "schema-name in oracle is required"); String tableName = config.get(OracleSourceOptions.TABLE_NAME); + //When debezium incrementally reads, it will be judged based on regexp_like. + //When the regular length exceeds 512, an error will be reported, like ORA-12733: regular expression too long + if(tableName.length() > 384){ + //max database name length 128 + tableName = StringUtils.isNullOrWhitespaceOnly(includingTables) ? ".*" : includingTables; + } + String url = config.get(OracleSourceOptions.URL); String hostname = config.get(OracleSourceOptions.HOSTNAME); Integer port = config.get(OracleSourceOptions.PORT); @@ -127,7 +135,6 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { } //debezium properties set - Properties debeziumProperties = new Properties(); debeziumProperties.put("decimal.handling.mode", "string"); //date to string debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);