Skip to content

Commit

Permalink
[Improve] standardize Flink parameters (apache#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Mar 11, 2024
1 parent a758cee commit 782f36a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,14 @@ public interface ConfigurationOptions {
String DORIS_DEFAULT_CLUSTER = "default_cluster";

String TABLE_IDENTIFIER = "table.identifier";
String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
String DORIS_READ_FIELD = "doris.read.field";
String DORIS_FILTER_QUERY = "doris.filter.query";
String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
Integer DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;

String DORIS_USER = "username";
String DORIS_PASSWORD = "password";

String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password";
String DORIS_REQUEST_RETRIES = "doris.request.retries";
String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms";
String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout";
String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout";
String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout";
Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
Expand All @@ -53,12 +46,9 @@ public interface ConfigurationOptions {

String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;

String DORIS_VALUE_READER_CLASS = "doris.value.reader.class";

String DORIS_EXEC_MEM_LIMIT_DEFAULT_STR = "2048mb";
String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async";
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;

String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.factories.FactoryUtil;

import org.apache.doris.flink.sink.writer.WriteMode;
Expand All @@ -30,7 +31,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT_STR;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
Expand Down Expand Up @@ -95,47 +96,52 @@ public class DorisConfigOptions {
ConfigOptions.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
ConfigOptions.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
ConfigOptions.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
ConfigOptions.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
.withDescription(
"The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.");
public static final ConfigOption<Duration> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
ConfigOptions.key("doris.request.connect.timeout")
.durationType()
.defaultValue(Duration.ofMillis(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT))
.withDescription("Connection timeout for sending requests to Doris");
public static final ConfigOption<Duration> DORIS_REQUEST_READ_TIMEOUT_MS =
ConfigOptions.key("doris.request.read.timeout")
.durationType()
.defaultValue(Duration.ofMillis(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT))
.withDescription("Read timeout for sending requests to Doris");
public static final ConfigOption<Duration> DORIS_REQUEST_QUERY_TIMEOUT_S =
ConfigOptions.key("doris.request.query.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT))
.withDescription(
"The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit");
public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
ConfigOptions.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
.withDescription("Number of retries to send requests to Doris");
public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
ConfigOptions.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
.withDescription(
"Whether to support asynchronous conversion of Arrow format to RowBatch needed for connector iterations");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions.key("doris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
.withDescription(
"Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true");
public static final ConfigOption<Integer> DORIS_BATCH_SIZE =
ConfigOptions.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT =
.withDescription(
"The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.");
public static final ConfigOption<MemorySize> DORIS_EXEC_MEM_LIMIT =
ConfigOptions.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
.memoryType()
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
.withDescription("Memory limit for a single query. The default is 2048mb.");
public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions.key("source.use-old-api")
.booleanType()
Expand Down Expand Up @@ -198,20 +204,20 @@ public class DorisConfigOptions {
.defaultValue(true)
.withDescription("enable 2PC while loading");

public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.intType()
.defaultValue(10000)
.durationType()
.defaultValue(Duration.ofMillis(10000))
.withDescription("check exception with the interval while loading");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
public static final ConfigOption<Integer> SINK_BUFFER_SIZE =
public static final ConfigOption<MemorySize> SINK_BUFFER_SIZE =
ConfigOptions.key("sink.buffer-size")
.intType()
.defaultValue(1024 * 1024)
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
ConfigOptions.key("sink.buffer-count")
Expand Down Expand Up @@ -263,10 +269,10 @@ public class DorisConfigOptions {
.withDescription(
"The maximum number of flush items in each batch, the default is 5w");

public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES =
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions.key("sink.buffer-flush.max-bytes")
.intType()
.defaultValue(10 * 1024 * 1024)
.memoryType()
.defaultValue(MemorySize.parse("10mb"))
.withDescription(
"The maximum number of bytes flushed in each batch, the default is 10MB");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,16 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestQueryTimeoutS(
(int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestConnectTimeoutMs(
(int) readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS).toMillis())
.setRequestReadTimeoutMs(
(int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis())
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
.setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
Expand All @@ -220,9 +223,9 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
private DorisExecutionOptions getDorisExecutionOptions(
ReadableConfig readableConfig, Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setCheckInterval(readableConfig.get(SINK_CHECK_INTERVAL));
builder.setCheckInterval((int) readableConfig.get(SINK_CHECK_INTERVAL).toMillis());
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBufferSize(readableConfig.get(SINK_BUFFER_SIZE));
builder.setBufferSize((int) readableConfig.get(SINK_BUFFER_SIZE).getBytes());
builder.setBufferCount(readableConfig.get(SINK_BUFFER_COUNT));
builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
builder.setStreamLoadProp(streamLoadProp);
Expand All @@ -245,7 +248,8 @@ private DorisExecutionOptions getDorisExecutionOptions(
}
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
builder.setBufferFlushMaxBytes(
(int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ public DorisSink<String> buildDorisSink(String tableIdentifier) {
.ifPresent(executionBuilder::setBufferCount);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
.ifPresent(executionBuilder::setBufferSize);
.ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
.ifPresent(executionBuilder::setCheckInterval);
.ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
.ifPresent(executionBuilder::setMaxRetries);
Expand All @@ -289,7 +289,7 @@ public DorisSink<String> buildDorisSink(String tableIdentifier) {
.ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
.ifPresent(executionBuilder::setBufferFlushMaxBytes);
.ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
.ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
Expand Down

0 comments on commit 782f36a

Please sign in to comment.