From 0f1615b28a5f78a0e7f773121a62d33310600ae6 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 3 Sep 2024 16:33:43 +0800 Subject: [PATCH] add check for doris.batch.size and improve some log print --- .../org/apache/doris/flink/cfg/ConfigurationOptions.java | 2 +- .../java/org/apache/doris/flink/cfg/DorisReadOptions.java | 5 +++++ .../jsondebezium/JsonDebeziumSchemaChangeImplV2.java | 2 +- .../serializer/jsondebezium/SQLParserSchemaChange.java | 2 +- .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 1 + 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index c249c2519..50103307f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -42,7 +42,7 @@ public interface ConfigurationOptions { Integer DORIS_TABLET_SIZE_MIN = 1; String DORIS_BATCH_SIZE = "doris.batch.size"; - Integer DORIS_BATCH_SIZE_DEFAULT = 1024; + Integer DORIS_BATCH_SIZE_DEFAULT = 4064; String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 2f6cd8a86..14bdf7e63 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.cfg; +import org.apache.flink.util.Preconditions; + import java.io.Serializable; import java.util.Objects; @@ -276,6 +278,9 @@ public Builder setFlightSqlPort(Integer flightSqlPort) { } public DorisReadOptions build() { + Preconditions.checkArgument( + requestBatchSize >= 1 && requestBatchSize <= 65535, + "batchSize should be between 1 and 65535"); return new DorisReadOptions( readFields, filterQuery, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 6600dd07e..86848a707 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -123,7 +123,7 @@ public boolean schemaChange(JsonNode recordRoot) { String dorisTbl = getCreateTableIdentifier(recordRoot); changeContext.getTableMapping().put(cdcTbl, dorisTbl); this.tableMapping = changeContext.getTableMapping(); - LOG.info("create table ddl status: {}", status); + LOG.info("create table ddl status: {}, add tableMapping {},{}", status, cdcTbl, dorisTbl); } } else if (eventType.equals(EventType.ALTER)) { Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java index 5bf245ee2..4aebc5a17 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -83,7 +83,7 @@ public boolean schemaChange(JsonNode recordRoot) { String dorisTbl = getCreateTableIdentifier(recordRoot); changeContext.getTableMapping().put(cdcTbl, dorisTbl); this.tableMapping = changeContext.getTableMapping(); - LOG.info("create table ddl status: {}", status); + LOG.info("create table ddl status: {}, add tableMapping {},{}", status, cdcTbl, dorisTbl); } } else if (eventType.equals(EventType.ALTER)) { Tuple2 dorisTableTuple = getDorisTableTuple(recordRoot); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 5ae44da66..7cd29506e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -158,6 +158,7 @@ public void build() throws Exception { System.out.println("Create table finished."); System.exit(0); } + LOG.info("table mapping: {}", tableMapping); config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); DataStreamSource streamSource = buildCdcSource(env); if (singleSink) {