From 0ccf7287d2d3984123d8f9ef3b3d478e088f370c Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 26 Dec 2024 18:44:15 +0800 Subject: [PATCH] fix oracle ora-12733 --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 11 ++++++++++- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 14 ++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 701bd9672..0c1b860a2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -42,6 +42,7 @@ import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.table.DorisConfigOptions; +import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; @@ -359,7 +361,14 @@ protected boolean isSyncNeeded(String tableName) { protected String getSyncTableList(List syncTables) { if (!singleSink) { - return String.format("(%s)\\.(%s)", getTableListPrefix(), String.join("|", syncTables)); + if (this instanceof OracleDatabaseSync) { + return syncTables.stream() + .map(v -> getTableListPrefix() + "\\." + v) + .collect(Collectors.joining("|")); + } else { + return String.format( + "(%s)\\.(%s)", getTableListPrefix(), String.join("|", syncTables)); + } } else { // includingTablePattern and ^excludingPattern if (includingTables == null) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 1a6a41877..629d7c4c2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -164,16 +164,10 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { Preconditions.checkNotNull(databaseName, "database-name in oracle is required"); Preconditions.checkNotNull(schemaName, "schema-name in oracle is required"); String tableName = config.get(OracleSourceOptions.TABLE_NAME); - // When debezium incrementally reads (refer LogMinerQueryBuilder.listOfPatternsToSql), - // it will be judged based on regexp_like. When the regular length exceeds 512, an error - // will be reported, like ORA-12733: regular expression too long - if (tableName.length() > 450) { - // REGEXP_LIKE('^SCHEMA.(TBL1|TBL2)$') - if (StringUtils.isNullOrWhitespaceOnly(excludingTables) - && (StringUtils.isNullOrWhitespaceOnly(includingTables) - || ".*".equals(includingTables))) { - tableName = ".*"; - } + // LogMinerQueryBuilder.buildTablePredicate is separated by commas to avoid + // the error ORA-12733 when the regexp_like regular expression exceeds 512 characters + if (!singleSink && tableName.length() > 256) { + tableName = tableName.replace("|", ","); } String url = config.get(OracleSourceOptions.URL);