From 4848fd096d479afac8ab655232924c5c11309adf Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 11 Oct 2024 11:28:15 +0800 Subject: [PATCH] improve dep and add builder comment --- flink-doris-connector/pom.xml | 10 +- .../flink/cfg/DorisExecutionOptions.java | 115 ++++++++++++++++++ .../apache/doris/flink/cfg/DorisOptions.java | 54 +++++++- .../doris/flink/cfg/DorisReadOptions.java | 101 ++++++++++++++- .../apache/doris/flink/sink/DorisSink.java | 30 +++++ .../doris/flink/source/DorisSource.java | 24 ++++ 6 files changed, 315 insertions(+), 19 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index d773339b3..775242b05 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -105,14 +105,6 @@ under the License. thrift-service ${thrift-service.version} - - - org.apache.flink - flink-clients - ${flink.version} - provided - - org.apache.flink flink-table-planner-loader @@ -369,7 +361,7 @@ under the License. org.apache.flink flink-runtime-web ${flink.version} - provided + test diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 7ad8ba971..831a317ee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -292,51 +292,112 @@ public static class Builder { private WriteMode writeMode = WriteMode.STREAM_LOAD; private boolean ignoreCommitError = false; + /** + * Sets the checkInterval to check exception with the interval while loading, The default is + * 0, disabling the checker thread. + * + * @param checkInterval + * @return this DorisExecutionOptions.builder. + */ public Builder setCheckInterval(Integer checkInterval) { this.checkInterval = checkInterval; return this; } + /** + * Sets the maxRetries to load data. In batch mode, this parameter is the number of stream + * load retries, In non-batch mode, this parameter is the number of retries in the commit + * phase. + * + * @param maxRetries + * @return this DorisExecutionOptions.builder. + */ public Builder setMaxRetries(Integer maxRetries) { this.maxRetries = maxRetries; return this; } + /** + * Sets the buffer size to cache data for stream load. Only valid in non-batch mode. + * + * @param bufferSize + * @return this DorisExecutionOptions.builder. + */ public Builder setBufferSize(int bufferSize) { this.bufferSize = bufferSize; return this; } + /** + * Sets the buffer count to cache data for stream load. Only valid in non-batch mode. + * + * @param bufferCount + * @return this DorisExecutionOptions.builder. + */ public Builder setBufferCount(int bufferCount) { this.bufferCount = bufferCount; return this; } + /** + * Sets the unique label prefix for stream load. + * + * @param labelPrefix + * @return this DorisExecutionOptions.builder. + */ public Builder setLabelPrefix(String labelPrefix) { this.labelPrefix = labelPrefix; return this; } + /** + * Sets whether to use cache for stream load. Only valid in non-batch mode. + * + * @param useCache + * @return this DorisExecutionOptions.builder. + */ public Builder setUseCache(boolean useCache) { this.useCache = useCache; return this; } + /** + * Sets the properties for stream load. + * + * @param streamLoadProp + * @return this DorisExecutionOptions.builder. + */ public Builder setStreamLoadProp(Properties streamLoadProp) { this.streamLoadProp = streamLoadProp; return this; } + /** + * Sets whether to perform the deletion operation for stream load. + * + * @param enableDelete + * @return this DorisExecutionOptions.builder. + */ public Builder setDeletable(Boolean enableDelete) { this.enableDelete = enableDelete; return this; } + /** + * Sets whether to disable 2pc(two-phase commit) for stream load. + * + * @return this DorisExecutionOptions.builder. + */ public Builder disable2PC() { this.enable2PC = false; return this; } + /** + * Sets whether to force 2pc on. The default uniq model will turn off 2pc. + * + * @return this DorisExecutionOptions.builder. + */ public Builder enable2PC() { this.enable2PC = true; // Force open 2pc @@ -344,6 +405,12 @@ public Builder enable2PC() { return this; } + /** + * Set whether to use batch mode to stream load. + * + * @param enableBatchMode + * @return this DorisExecutionOptions.builder. + */ public Builder setBatchMode(Boolean enableBatchMode) { this.enableBatchMode = enableBatchMode; if (enableBatchMode.equals(Boolean.TRUE)) { @@ -352,41 +419,89 @@ public Builder setBatchMode(Boolean enableBatchMode) { return this; } + /** + * Set queue size in batch mode. + * + * @param flushQueueSize + * @return this DorisExecutionOptions.builder. + */ public Builder setFlushQueueSize(int flushQueueSize) { this.flushQueueSize = flushQueueSize; return this; } + /** + * Set the flush interval mills for stream load in batch mode. + * + * @param bufferFlushIntervalMs + * @return this DorisExecutionOptions.builder. + */ public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) { this.bufferFlushIntervalMs = bufferFlushIntervalMs; return this; } + /** + * Set the max flush rows for stream load in batch mode. + * + * @param bufferFlushMaxRows + * @return this DorisExecutionOptions.builder. + */ public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) { this.bufferFlushMaxRows = bufferFlushMaxRows; return this; } + /** + * Set the max flush bytes for stream load in batch mode. + * + * @param bufferFlushMaxBytes + * @return this DorisExecutionOptions.builder. + */ public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) { this.bufferFlushMaxBytes = bufferFlushMaxBytes; return this; } + /** + * Set Whether to ignore the ignore updateBefore event. + * + * @param ignoreUpdateBefore + * @return this DorisExecutionOptions.builder. + */ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) { this.ignoreUpdateBefore = ignoreUpdateBefore; return this; } + /** + * Set the writing mode, only supports STREAM_LOAD and STREAM_LOAD_BATCH + * + * @param writeMode + * @return this DorisExecutionOptions.builder. + */ public Builder setWriteMode(WriteMode writeMode) { this.writeMode = writeMode; return this; } + /** + * Set whether to ignore commit failure errors. This is only valid in non-batch mode 2pc. + * When ignored, data loss may occur. + * + * @param ignoreCommitError + * @return this DorisExecutionOptions.builder. + */ public Builder setIgnoreCommitError(boolean ignoreCommitError) { this.ignoreCommitError = ignoreCommitError; return this; } + /** + * Build the {@link DorisExecutionOptions}. + * + * @return a DorisExecutionOptions with the settings made for this builder. + */ public DorisExecutionOptions build() { // If format=json is set but read_json_by_line is not set, record may not be written. if (streamLoadProp != null diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index bf6c7a28c..69273c9e0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -101,47 +101,89 @@ public static class Builder { private boolean autoRedirect = true; private String tableIdentifier; - /** required, tableIdentifier. */ + /** + * Sets the tableIdentifier for the DorisOptions. + * + * @param tableIdentifier Doris's database name and table name, such as db.tbl + * @return this DorisOptions.builder. + */ public Builder setTableIdentifier(String tableIdentifier) { this.tableIdentifier = tableIdentifier; return this; } - /** optional, user name. */ + /** + * Sets the username of doris cluster. + * + * @param username Doris cluster username + * @return this DorisOptions.builder. + */ public Builder setUsername(String username) { this.username = username; return this; } - /** optional, password. */ + /** + * Sets the password of doris cluster. + * + * @param password Doris cluster password + * @return this DorisOptions.builder. + */ public Builder setPassword(String password) { this.password = password; return this; } - /** required, Frontend Http Rest url. */ + /** + * Sets the doris frontend http rest url, such as 127.0.0.1:8030,127.0.0.2:8030 + * + * @param fenodes + * @return this DorisOptions.builder. + */ public Builder setFenodes(String fenodes) { this.fenodes = fenodes; return this; } - /** optional, Backend Http Port. */ + /** + * Sets the doris backend http rest url, such as 127.0.0.1:8040,127.0.0.2:8040 + * + * @param benodes + * @return this DorisOptions.builder. + */ public Builder setBenodes(String benodes) { this.benodes = benodes; return this; } - /** not required, fe jdbc url, for lookup query. */ + /** + * Sets the doris fe jdbc url for lookup query, such as jdbc:mysql://127.0.0.1:9030 + * + * @param jdbcUrl + * @return this DorisOptions.builder. + */ public Builder setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; return this; } + /** + * Sets the autoRedirect for DorisOptions. If true, stream load will be written directly to + * fe. If false, it will first get the be list and write directly to be. + * + * @param autoRedirect + * @return this DorisOptions.builder. + */ public Builder setAutoRedirect(boolean autoRedirect) { this.autoRedirect = autoRedirect; return this; } + /** + * Build the {@link DorisOptions}. + * + * @return a DorisOptions with the settings made for this builder. + */ public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); // multi table load, don't need check diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 937d32866..0448d60a9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -223,76 +223,169 @@ public static class Builder { private Boolean useFlightSql = false; private Integer flightSqlPort; + /** + * Sets the readFields for doris table to push down projection, such as name,age. + * + * @param readFields + * @return this DorisReadOptions.builder. + */ public Builder setReadFields(String readFields) { this.readFields = readFields; return this; } + /** + * Sets the filterQuery for doris table to push down filter, such as name,age. + * + * @param filterQuery + * @return this DorisReadOptions.builder. + */ public Builder setFilterQuery(String filterQuery) { this.filterQuery = filterQuery; return this; } + /** + * Sets the requestTabletSize for DorisReadOptions. The number of Doris Tablets + * corresponding to a Partition, the smaller this value is set, the more Partitions will be + * generated. This improves the parallelism on the Flink side, but at the same time puts + * more pressure on Doris. + * + * @param requestTabletSize + * @return this DorisReadOptions.builder. + */ public Builder setRequestTabletSize(Integer requestTabletSize) { this.requestTabletSize = requestTabletSize; return this; } + /** + * Sets the request connect timeout for DorisReadOptions. + * + * @param requestConnectTimeoutMs + * @return this DorisReadOptions.builder. + */ public Builder setRequestConnectTimeoutMs(Integer requestConnectTimeoutMs) { this.requestConnectTimeoutMs = requestConnectTimeoutMs; return this; } + /** + * Sets the request read timeout for DorisReadOptions. + * + * @param requestReadTimeoutMs + * @return this DorisReadOptions.builder. + */ public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) { this.requestReadTimeoutMs = requestReadTimeoutMs; return this; } + /** + * Sets the timeout time for querying Doris for DorisReadOptions. + * + * @param requesQueryTimeoutS + * @return this DorisReadOptions.builder. + */ public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) { this.requestQueryTimeoutS = requesQueryTimeoutS; return this; } + /** + * Sets the number of retries to send requests to Doris for DorisReadOptions. + * + * @param requestRetries + * @return this DorisReadOptions.builder. + */ public Builder setRequestRetries(Integer requestRetries) { this.requestRetries = requestRetries; return this; } + /** + * Sets the read batch size for DorisReadOptions. + * + * @param requestBatchSize + * @return this DorisReadOptions.builder. + */ public Builder setRequestBatchSize(Integer requestBatchSize) { this.requestBatchSize = requestBatchSize; return this; } + /** + * Sets the Memory limit for a single query for DorisReadOptions. + * + * @param execMemLimit + * @return this DorisReadOptions.builder. + */ public Builder setExecMemLimit(Long execMemLimit) { this.execMemLimit = execMemLimit; return this; } + /** + * Sets the Asynchronous conversion of internal processing queue in Arrow format + * + * @param deserializeQueueSize + * @return this DorisReadOptions.builder. + */ public Builder setDeserializeQueueSize(Integer deserializeQueueSize) { this.deserializeQueueSize = deserializeQueueSize; return this; } + /** + * Sets Whether to support asynchronous conversion of Arrow format to RowBatch needed for + * connector iterations. + * + * @param deserializeArrowAsync + * @return this DorisReadOptions.builder. + */ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) { this.deserializeArrowAsync = deserializeArrowAsync; return this; } - public Builder setUseFlightSql(Boolean useFlightSql) { - this.useFlightSql = useFlightSql; + /** + * Whether to use the legacy source api + * + * @param useOldApi + * @return this DorisReadOptions.builder. + */ + public Builder setUseOldApi(Boolean useOldApi) { + this.useOldApi = useOldApi; return this; } - public Builder setUseOldApi(Boolean useOldApi) { - this.useOldApi = useOldApi; + /** + * Whether to use arrow flight sql for query, only supports Doris2.1 and above + * + * @param useFlightSql + * @return this DorisReadOptions.builder. + */ + public Builder setUseFlightSql(Boolean useFlightSql) { + this.useFlightSql = useFlightSql; return this; } + /** + * Sets the flight sql port for DorisReadOptions. + * + * @param flightSqlPort + * @return this DorisReadOptions.builder. + */ public Builder setFlightSqlPort(Integer flightSqlPort) { this.flightSqlPort = flightSqlPort; return this; } + /** + * Build the {@link DorisReadOptions}. + * + * @return a DorisReadOptions with the settings made for this builder. + */ public DorisReadOptions build() { return new DorisReadOptions( readFields, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index fd61d7fd9..d8e0d8277 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -161,26 +161,56 @@ public static class Builder { private DorisExecutionOptions dorisExecutionOptions; private DorisRecordSerializer serializer; + /** + * Sets the DorisOptions for the DorisSink. + * + * @param dorisOptions the common options of the doris cluster. + * @return this DorisSink.Builder. + */ public Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; return this; } + /** + * Sets the DorisReadOptions for the DorisSink. + * + * @param dorisReadOptions the read options of the DorisSink. + * @return this DorisSink.Builder. + */ public Builder setDorisReadOptions(DorisReadOptions dorisReadOptions) { this.dorisReadOptions = dorisReadOptions; return this; } + /** + * Sets the DorisExecutionOptions for the DorisSink. + * + * @param dorisExecutionOptions the execution options of the DorisSink. + * @return this DorisSink.Builder. + */ public Builder setDorisExecutionOptions(DorisExecutionOptions dorisExecutionOptions) { this.dorisExecutionOptions = dorisExecutionOptions; return this; } + /** + * Sets the {@link DorisRecordSerializer serializer} that transforms incoming records to + * DorisRecord + * + * @param serializer + * @return this DorisSink.Builder. + */ public Builder setSerializer(DorisRecordSerializer serializer) { this.serializer = serializer; return this; } + /** + * Build the {@link DorisSink}. + * + * @return a DorisSink with the settings made for this builder. + */ public DorisSink build() { Preconditions.checkNotNull(dorisOptions); Preconditions.checkNotNull(dorisExecutionOptions); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 1b05453ad..19a7fe36d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -168,21 +168,40 @@ public static class DorisSourceBuilder { boundedness = Boundedness.BOUNDED; } + /** + * Sets the DorisOptions for the DorisSource. + * + * @param options the common options of the doris cluster. + * @return this DorisSourceBuilder. + */ public DorisSourceBuilder setDorisOptions(DorisOptions options) { this.options = options; return this; } + /** + * Sets the DorisReadOptions for the DorisSource. + * + * @param readOptions the read options of the DorisSource. + * @return this DorisSourceBuilder. + */ public DorisSourceBuilder setDorisReadOptions(DorisReadOptions readOptions) { this.readOptions = readOptions; return this; } + /** Sets the Boundedness for the DorisSource, Currently only BOUNDED is supported. */ public DorisSourceBuilder setBoundedness(Boundedness boundedness) { this.boundedness = boundedness; return this; } + /** + * Sets the {@link DorisDeserializationSchema deserializer} of the Record for DorisSource. + * + * @param deserializer the deserializer for Doris Record. + * @return this DorisSourceBuilder. + */ public DorisSourceBuilder setDeserializer( DorisDeserializationSchema deserializer) { this.deserializer = deserializer; @@ -194,6 +213,11 @@ public DorisSourceBuilder setResolvedFilterQuery(List resolvedFilte return this; } + /** + * Build the {@link DorisSource}. + * + * @return a DorisSource with the settings made for this builder. + */ public DorisSource build() { if (readOptions == null) { readOptions = DorisReadOptions.builder().build();