From b42bc6ba7ed0adfc3fd75f1110da687e25793cb7 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 3 Sep 2024 18:45:44 +0800 Subject: [PATCH] [Improve] add check for doris.batch.size (#480) --- .../org/apache/doris/flink/cfg/ConfigurationOptions.java | 3 ++- .../jsondebezium/JsonDebeziumSchemaChangeImplV2.java | 6 +++++- .../serializer/jsondebezium/SQLParserSchemaChange.java | 6 +++++- .../apache/doris/flink/source/reader/DorisValueReader.java | 3 ++- .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 1 + .../doris/flink/table/DorisDynamicTableFactoryTest.java | 4 ++-- 6 files changed, 17 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index c249c2519..3709f0ae8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -42,7 +42,8 @@ public interface ConfigurationOptions { Integer DORIS_TABLET_SIZE_MIN = 1; String DORIS_BATCH_SIZE = "doris.batch.size"; - Integer DORIS_BATCH_SIZE_DEFAULT = 1024; + Integer DORIS_BATCH_SIZE_DEFAULT = 4064; + Integer DORIS_BATCH_SIZE_MAX = 65535; String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 6600dd07e..0b9172e85 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -123,7 +123,11 @@ public boolean schemaChange(JsonNode recordRoot) { String dorisTbl = getCreateTableIdentifier(recordRoot); changeContext.getTableMapping().put(cdcTbl, dorisTbl); this.tableMapping = changeContext.getTableMapping(); - LOG.info("create table ddl status: {}", status); + LOG.info( + "create table ddl status: {}, add tableMapping {},{}", + status, + cdcTbl, + dorisTbl); } } else if (eventType.equals(EventType.ALTER)) { Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java index 5bf245ee2..fd10ba538 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -83,7 +83,11 @@ public boolean schemaChange(JsonNode recordRoot) { String dorisTbl = getCreateTableIdentifier(recordRoot); changeContext.getTableMapping().put(cdcTbl, dorisTbl); this.tableMapping = changeContext.getTableMapping(); - LOG.info("create table ddl status: {}", status); + LOG.info( + "create table ddl status: {}, add tableMapping {},{}", + status, + cdcTbl, + dorisTbl); } } else if (eventType.equals(EventType.ALTER)) { Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index 35639e8a9..e55e1775f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_MAX; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; @@ -130,7 +131,7 @@ private TScanOpenParams openParams() { Integer batchSize = readOptions.getRequestBatchSize() == null ? DORIS_BATCH_SIZE_DEFAULT - : readOptions.getRequestBatchSize(); + : Math.min(readOptions.getRequestBatchSize(), DORIS_BATCH_SIZE_MAX); Integer queryDorisTimeout = readOptions.getRequestQueryTimeoutS() == null ? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 5ae44da66..7cd29506e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -158,6 +158,7 @@ public void build() throws Exception { System.out.println("Create table finished."); System.exit(0); } + LOG.info("table mapping: {}", tableMapping); config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); DataStreamSource streamSource = buildCdcSource(env); if (singleSink) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 56887d93f..6b7ef1f7b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -53,7 +53,7 @@ public void testDorisSourceProperties() { Map properties = getAllOptions(); properties.put("doris.request.query.timeout", "21600s"); properties.put("doris.request.tablet.size", "1"); - properties.put("doris.batch.size", "1024"); + properties.put("doris.batch.size", "4064"); properties.put("doris.exec.mem.limit", "8192mb"); properties.put("doris.deserialize.arrow.async", "false"); properties.put("doris.deserialize.queue.size", "64"); @@ -118,7 +118,7 @@ public void testDorisSinkProperties() { Map properties = getAllOptions(); properties.put("doris.request.query.timeout", "21600s"); properties.put("doris.request.tablet.size", "1"); - properties.put("doris.batch.size", "1024"); + properties.put("doris.batch.size", "4064"); properties.put("doris.exec.mem.limit", "8192mb"); properties.put("doris.deserialize.arrow.async", "false"); properties.put("doris.deserialize.queue.size", "64");