Skip to content

Commit

Permalink
improve dep and add builder comment
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Oct 11, 2024
1 parent 8efaa43 commit 4848fd0
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 19 deletions.
10 changes: 1 addition & 9 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ under the License.
<artifactId>thrift-service</artifactId>
<version>${thrift-service.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
Expand Down Expand Up @@ -369,7 +361,7 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<scope>test</scope>
</dependency>
<!--Test-->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,58 +292,125 @@ public static class Builder {
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;

/**
* Sets the checkInterval to check exception with the interval while loading, The default is
* 0, disabling the checker thread.
*
* @param checkInterval
* @return this DorisExecutionOptions.builder.
*/
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
}

/**
* Sets the maxRetries to load data. In batch mode, this parameter is the number of stream
* load retries, In non-batch mode, this parameter is the number of retries in the commit
* phase.
*
* @param maxRetries
* @return this DorisExecutionOptions.builder.
*/
public Builder setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}

/**
* Sets the buffer size to cache data for stream load. Only valid in non-batch mode.
*
* @param bufferSize
* @return this DorisExecutionOptions.builder.
*/
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

/**
* Sets the buffer count to cache data for stream load. Only valid in non-batch mode.
*
* @param bufferCount
* @return this DorisExecutionOptions.builder.
*/
public Builder setBufferCount(int bufferCount) {
this.bufferCount = bufferCount;
return this;
}

/**
* Sets the unique label prefix for stream load.
*
* @param labelPrefix
* @return this DorisExecutionOptions.builder.
*/
public Builder setLabelPrefix(String labelPrefix) {
this.labelPrefix = labelPrefix;
return this;
}

/**
* Sets whether to use cache for stream load. Only valid in non-batch mode.
*
* @param useCache
* @return this DorisExecutionOptions.builder.
*/
public Builder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}

/**
* Sets the properties for stream load.
*
* @param streamLoadProp
* @return this DorisExecutionOptions.builder.
*/
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}

/**
* Sets whether to perform the deletion operation for stream load.
*
* @param enableDelete
* @return this DorisExecutionOptions.builder.
*/
public Builder setDeletable(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}

/**
* Sets whether to disable 2pc(two-phase commit) for stream load.
*
* @return this DorisExecutionOptions.builder.
*/
public Builder disable2PC() {
this.enable2PC = false;
return this;
}

/**
* Sets whether to force 2pc on. The default uniq model will turn off 2pc.
*
* @return this DorisExecutionOptions.builder.
*/
public Builder enable2PC() {
this.enable2PC = true;
// Force open 2pc
this.force2PC = true;
return this;
}

/**
* Set whether to use batch mode to stream load.
*
* @param enableBatchMode
* @return this DorisExecutionOptions.builder.
*/
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
if (enableBatchMode.equals(Boolean.TRUE)) {
Expand All @@ -352,41 +419,89 @@ public Builder setBatchMode(Boolean enableBatchMode) {
return this;
}

/**
* Set queue size in batch mode.
*
* @param flushQueueSize
* @return this DorisExecutionOptions.builder.
*/
public Builder setFlushQueueSize(int flushQueueSize) {
this.flushQueueSize = flushQueueSize;
return this;
}

/**
* Set the flush interval mills for stream load in batch mode.
*
* @param bufferFlushIntervalMs
* @return this DorisExecutionOptions.builder.
*/
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}

/**
* Set the max flush rows for stream load in batch mode.
*
* @param bufferFlushMaxRows
* @return this DorisExecutionOptions.builder.
*/
public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
this.bufferFlushMaxRows = bufferFlushMaxRows;
return this;
}

/**
* Set the max flush bytes for stream load in batch mode.
*
* @param bufferFlushMaxBytes
* @return this DorisExecutionOptions.builder.
*/
public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
return this;
}

/**
* Set Whether to ignore the ignore updateBefore event.
*
* @param ignoreUpdateBefore
* @return this DorisExecutionOptions.builder.
*/
public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
this.ignoreUpdateBefore = ignoreUpdateBefore;
return this;
}

/**
* Set the writing mode, only supports STREAM_LOAD and STREAM_LOAD_BATCH
*
* @param writeMode
* @return this DorisExecutionOptions.builder.
*/
public Builder setWriteMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}

/**
* Set whether to ignore commit failure errors. This is only valid in non-batch mode 2pc.
* When ignored, data loss may occur.
*
* @param ignoreCommitError
* @return this DorisExecutionOptions.builder.
*/
public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}

/**
* Build the {@link DorisExecutionOptions}.
*
* @return a DorisExecutionOptions with the settings made for this builder.
*/
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record may not be written.
if (streamLoadProp != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,47 +101,89 @@ public static class Builder {
private boolean autoRedirect = true;
private String tableIdentifier;

/** required, tableIdentifier. */
/**
* Sets the tableIdentifier for the DorisOptions.
*
* @param tableIdentifier Doris's database name and table name, such as db.tbl
* @return this DorisOptions.builder.
*/
public Builder setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
return this;
}

/** optional, user name. */
/**
* Sets the username of doris cluster.
*
* @param username Doris cluster username
* @return this DorisOptions.builder.
*/
public Builder setUsername(String username) {
this.username = username;
return this;
}

/** optional, password. */
/**
* Sets the password of doris cluster.
*
* @param password Doris cluster password
* @return this DorisOptions.builder.
*/
public Builder setPassword(String password) {
this.password = password;
return this;
}

/** required, Frontend Http Rest url. */
/**
* Sets the doris frontend http rest url, such as 127.0.0.1:8030,127.0.0.2:8030
*
* @param fenodes
* @return this DorisOptions.builder.
*/
public Builder setFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}

/** optional, Backend Http Port. */
/**
* Sets the doris backend http rest url, such as 127.0.0.1:8040,127.0.0.2:8040
*
* @param benodes
* @return this DorisOptions.builder.
*/
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}

/** not required, fe jdbc url, for lookup query. */
/**
* Sets the doris fe jdbc url for lookup query, such as jdbc:mysql://127.0.0.1:9030
*
* @param jdbcUrl
* @return this DorisOptions.builder.
*/
public Builder setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}

/**
* Sets the autoRedirect for DorisOptions. If true, stream load will be written directly to
* fe. If false, it will first get the be list and write directly to be.
*
* @param autoRedirect
* @return this DorisOptions.builder.
*/
public Builder setAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}

/**
* Build the {@link DorisOptions}.
*
* @return a DorisOptions with the settings made for this builder.
*/
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
// multi table load, don't need check
Expand Down
Loading

0 comments on commit 4848fd0

Please sign in to comment.