Skip to content

Commit

Permalink
add check for doris.batch.size and improve some log print
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 3, 2024
1 parent 1f9f12d commit 0f1615b
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface ConfigurationOptions {
Integer DORIS_TABLET_SIZE_MIN = 1;

String DORIS_BATCH_SIZE = "doris.batch.size";
Integer DORIS_BATCH_SIZE_DEFAULT = 1024;
Integer DORIS_BATCH_SIZE_DEFAULT = 4064;

String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.cfg;

import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.Objects;

Expand Down Expand Up @@ -276,6 +278,9 @@ public Builder setFlightSqlPort(Integer flightSqlPort) {
}

public DorisReadOptions build() {
Preconditions.checkArgument(
requestBatchSize >= 1 && requestBatchSize <= 65535,
"batchSize should be between 1 and 65535");
return new DorisReadOptions(
readFields,
filterQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public boolean schemaChange(JsonNode recordRoot) {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info("create table ddl status: {}", status);
LOG.info("create table ddl status: {}, add tableMapping {},{}", status, cdcTbl, dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple = getDorisTableTuple(recordRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public boolean schemaChange(JsonNode recordRoot) {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info("create table ddl status: {}", status);
LOG.info("create table ddl status: {}, add tableMapping {},{}", status, cdcTbl, dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple = getDorisTableTuple(recordRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void build() throws Exception {
System.out.println("Create table finished.");
System.exit(0);
}
LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if (singleSink) {
Expand Down

0 comments on commit 0f1615b

Please sign in to comment.