Skip to content

Commit

Permalink
[Improve] add check for doris.batch.size (apache#480)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Sep 3, 2024
1 parent 1f9f12d commit b42bc6b
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public interface ConfigurationOptions {
Integer DORIS_TABLET_SIZE_MIN = 1;

String DORIS_BATCH_SIZE = "doris.batch.size";
Integer DORIS_BATCH_SIZE_DEFAULT = 1024;
Integer DORIS_BATCH_SIZE_DEFAULT = 4064;
Integer DORIS_BATCH_SIZE_MAX = 65535;

String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public boolean schemaChange(JsonNode recordRoot) {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info("create table ddl status: {}", status);
LOG.info(
"create table ddl status: {}, add tableMapping {},{}",
status,
cdcTbl,
dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple = getDorisTableTuple(recordRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ public boolean schemaChange(JsonNode recordRoot) {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info("create table ddl status: {}", status);
LOG.info(
"create table ddl status: {}, add tableMapping {},{}",
status,
cdcTbl,
dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple = getDorisTableTuple(recordRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_MAX;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
Expand Down Expand Up @@ -130,7 +131,7 @@ private TScanOpenParams openParams() {
Integer batchSize =
readOptions.getRequestBatchSize() == null
? DORIS_BATCH_SIZE_DEFAULT
: readOptions.getRequestBatchSize();
: Math.min(readOptions.getRequestBatchSize(), DORIS_BATCH_SIZE_MAX);
Integer queryDorisTimeout =
readOptions.getRequestQueryTimeoutS() == null
? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void build() throws Exception {
System.out.println("Create table finished.");
System.exit(0);
}
LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if (singleSink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testDorisSourceProperties() {
Map<String, String> properties = getAllOptions();
properties.put("doris.request.query.timeout", "21600s");
properties.put("doris.request.tablet.size", "1");
properties.put("doris.batch.size", "1024");
properties.put("doris.batch.size", "4064");
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testDorisSinkProperties() {
Map<String, String> properties = getAllOptions();
properties.put("doris.request.query.timeout", "21600s");
properties.put("doris.request.tablet.size", "1");
properties.put("doris.batch.size", "1024");
properties.put("doris.batch.size", "4064");
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
Expand Down

0 comments on commit b42bc6b

Please sign in to comment.