From 7afddab1ec698ca1a6b5639fdccb01b943e83b7f Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Sat, 12 Oct 2024 10:31:35 +0800
Subject: [PATCH] add flink-python dep , add pr 495 496 497
---
flink-doris-connector/pom.xml | 11 +-
.../flink/cfg/DorisExecutionOptions.java | 115 +++++++++++++++++
.../apache/doris/flink/cfg/DorisOptions.java | 54 +++++++-
.../doris/flink/cfg/DorisReadOptions.java | 119 +++++++++++++++++-
.../apache/doris/flink/sink/DorisSink.java | 30 +++++
.../doris/flink/source/DorisSource.java | 47 +++----
.../flink/table/DorisDynamicTableSource.java | 13 +-
.../doris/flink/source/DorisSourceITCase.java | 47 +++++++
8 files changed, 392 insertions(+), 44 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 2cdfbe810..7edef550a 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -105,14 +105,6 @@ under the License.
thrift-service
${thrift-service.version}
-
-
- org.apache.flink
- flink-clients
- ${flink.version}
- provided
-
-
org.apache.flink
flink-table-planner-loader
@@ -145,7 +137,6 @@ under the License.
org.apache.flink
${flink.python.id}
${flink.version}
- provided
@@ -287,7 +278,7 @@ under the License.
org.apache.flink
flink-runtime-web
${flink.version}
- provided
+ test
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 7ad8ba971..831a317ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -292,51 +292,112 @@ public static class Builder {
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;
+ /**
+ * Sets the checkInterval to check exception with the interval while loading, The default is
+ * 0, disabling the checker thread.
+ *
+ * @param checkInterval
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
}
+ /**
+ * Sets the maxRetries to load data. In batch mode, this parameter is the number of stream
+ * load retries, In non-batch mode, this parameter is the number of retries in the commit
+ * phase.
+ *
+ * @param maxRetries
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}
+ /**
+ * Sets the buffer size to cache data for stream load. Only valid in non-batch mode.
+ *
+ * @param bufferSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
+ /**
+ * Sets the buffer count to cache data for stream load. Only valid in non-batch mode.
+ *
+ * @param bufferCount
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferCount(int bufferCount) {
this.bufferCount = bufferCount;
return this;
}
+ /**
+ * Sets the unique label prefix for stream load.
+ *
+ * @param labelPrefix
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setLabelPrefix(String labelPrefix) {
this.labelPrefix = labelPrefix;
return this;
}
+ /**
+ * Sets whether to use cache for stream load. Only valid in non-batch mode.
+ *
+ * @param useCache
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}
+ /**
+ * Sets the properties for stream load.
+ *
+ * @param streamLoadProp
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}
+ /**
+ * Sets whether to perform the deletion operation for stream load.
+ *
+ * @param enableDelete
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setDeletable(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}
+ /**
+ * Sets whether to disable 2pc(two-phase commit) for stream load.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder disable2PC() {
this.enable2PC = false;
return this;
}
+ /**
+ * Sets whether to force 2pc on. The default uniq model will turn off 2pc.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder enable2PC() {
this.enable2PC = true;
// Force open 2pc
@@ -344,6 +405,12 @@ public Builder enable2PC() {
return this;
}
+ /**
+ * Set whether to use batch mode to stream load.
+ *
+ * @param enableBatchMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
if (enableBatchMode.equals(Boolean.TRUE)) {
@@ -352,41 +419,89 @@ public Builder setBatchMode(Boolean enableBatchMode) {
return this;
}
+ /**
+ * Set queue size in batch mode.
+ *
+ * @param flushQueueSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setFlushQueueSize(int flushQueueSize) {
this.flushQueueSize = flushQueueSize;
return this;
}
+ /**
+ * Set the flush interval mills for stream load in batch mode.
+ *
+ * @param bufferFlushIntervalMs
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
+ /**
+ * Set the max flush rows for stream load in batch mode.
+ *
+ * @param bufferFlushMaxRows
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
this.bufferFlushMaxRows = bufferFlushMaxRows;
return this;
}
+ /**
+ * Set the max flush bytes for stream load in batch mode.
+ *
+ * @param bufferFlushMaxBytes
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
return this;
}
+ /**
+ * Set Whether to ignore the ignore updateBefore event.
+ *
+ * @param ignoreUpdateBefore
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
this.ignoreUpdateBefore = ignoreUpdateBefore;
return this;
}
+ /**
+ * Set the writing mode, only supports STREAM_LOAD and STREAM_LOAD_BATCH
+ *
+ * @param writeMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setWriteMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}
+ /**
+ * Set whether to ignore commit failure errors. This is only valid in non-batch mode 2pc.
+ * When ignored, data loss may occur.
+ *
+ * @param ignoreCommitError
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}
+ /**
+ * Build the {@link DorisExecutionOptions}.
+ *
+ * @return a DorisExecutionOptions with the settings made for this builder.
+ */
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record may not be written.
if (streamLoadProp != null
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index bf6c7a28c..69273c9e0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -101,47 +101,89 @@ public static class Builder {
private boolean autoRedirect = true;
private String tableIdentifier;
- /** required, tableIdentifier. */
+ /**
+ * Sets the tableIdentifier for the DorisOptions.
+ *
+ * @param tableIdentifier Doris's database name and table name, such as db.tbl
+ * @return this DorisOptions.builder.
+ */
public Builder setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
return this;
}
- /** optional, user name. */
+ /**
+ * Sets the username of doris cluster.
+ *
+ * @param username Doris cluster username
+ * @return this DorisOptions.builder.
+ */
public Builder setUsername(String username) {
this.username = username;
return this;
}
- /** optional, password. */
+ /**
+ * Sets the password of doris cluster.
+ *
+ * @param password Doris cluster password
+ * @return this DorisOptions.builder.
+ */
public Builder setPassword(String password) {
this.password = password;
return this;
}
- /** required, Frontend Http Rest url. */
+ /**
+ * Sets the doris frontend http rest url, such as 127.0.0.1:8030,127.0.0.2:8030
+ *
+ * @param fenodes
+ * @return this DorisOptions.builder.
+ */
public Builder setFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}
- /** optional, Backend Http Port. */
+ /**
+ * Sets the doris backend http rest url, such as 127.0.0.1:8040,127.0.0.2:8040
+ *
+ * @param benodes
+ * @return this DorisOptions.builder.
+ */
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}
- /** not required, fe jdbc url, for lookup query. */
+ /**
+ * Sets the doris fe jdbc url for lookup query, such as jdbc:mysql://127.0.0.1:9030
+ *
+ * @param jdbcUrl
+ * @return this DorisOptions.builder.
+ */
public Builder setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}
+ /**
+ * Sets the autoRedirect for DorisOptions. If true, stream load will be written directly to
+ * fe. If false, it will first get the be list and write directly to be.
+ *
+ * @param autoRedirect
+ * @return this DorisOptions.builder.
+ */
public Builder setAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}
+ /**
+ * Build the {@link DorisOptions}.
+ *
+ * @return a DorisOptions with the settings made for this builder.
+ */
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
// multi table load, don't need check
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 2f6cd8a86..0448d60a9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -187,6 +187,24 @@ public int hashCode() {
flightSqlPort);
}
+ public DorisReadOptions copy() {
+ return new DorisReadOptions(
+ readFields,
+ filterQuery,
+ requestTabletSize,
+ requestConnectTimeoutMs,
+ requestReadTimeoutMs,
+ requestQueryTimeoutS,
+ requestRetries,
+ requestBatchSize,
+ execMemLimit,
+ deserializeQueueSize,
+ deserializeArrowAsync,
+ useOldApi,
+ useFlightSql,
+ flightSqlPort);
+ }
+
/** Builder of {@link DorisReadOptions}. */
public static class Builder {
@@ -205,76 +223,169 @@ public static class Builder {
private Boolean useFlightSql = false;
private Integer flightSqlPort;
+ /**
+ * Sets the readFields for doris table to push down projection, such as name,age.
+ *
+ * @param readFields
+ * @return this DorisReadOptions.builder.
+ */
public Builder setReadFields(String readFields) {
this.readFields = readFields;
return this;
}
+ /**
+ * Sets the filterQuery for doris table to push down filter, such as name,age.
+ *
+ * @param filterQuery
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
return this;
}
+ /**
+ * Sets the requestTabletSize for DorisReadOptions. The number of Doris Tablets
+ * corresponding to a Partition, the smaller this value is set, the more Partitions will be
+ * generated. This improves the parallelism on the Flink side, but at the same time puts
+ * more pressure on Doris.
+ *
+ * @param requestTabletSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestTabletSize(Integer requestTabletSize) {
this.requestTabletSize = requestTabletSize;
return this;
}
+ /**
+ * Sets the request connect timeout for DorisReadOptions.
+ *
+ * @param requestConnectTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestConnectTimeoutMs(Integer requestConnectTimeoutMs) {
this.requestConnectTimeoutMs = requestConnectTimeoutMs;
return this;
}
+ /**
+ * Sets the request read timeout for DorisReadOptions.
+ *
+ * @param requestReadTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) {
this.requestReadTimeoutMs = requestReadTimeoutMs;
return this;
}
+ /**
+ * Sets the timeout time for querying Doris for DorisReadOptions.
+ *
+ * @param requesQueryTimeoutS
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) {
this.requestQueryTimeoutS = requesQueryTimeoutS;
return this;
}
+ /**
+ * Sets the number of retries to send requests to Doris for DorisReadOptions.
+ *
+ * @param requestRetries
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestRetries(Integer requestRetries) {
this.requestRetries = requestRetries;
return this;
}
+ /**
+ * Sets the read batch size for DorisReadOptions.
+ *
+ * @param requestBatchSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestBatchSize(Integer requestBatchSize) {
this.requestBatchSize = requestBatchSize;
return this;
}
+ /**
+ * Sets the Memory limit for a single query for DorisReadOptions.
+ *
+ * @param execMemLimit
+ * @return this DorisReadOptions.builder.
+ */
public Builder setExecMemLimit(Long execMemLimit) {
this.execMemLimit = execMemLimit;
return this;
}
+ /**
+ * Sets the Asynchronous conversion of internal processing queue in Arrow format
+ *
+ * @param deserializeQueueSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeQueueSize(Integer deserializeQueueSize) {
this.deserializeQueueSize = deserializeQueueSize;
return this;
}
+ /**
+ * Sets Whether to support asynchronous conversion of Arrow format to RowBatch needed for
+ * connector iterations.
+ *
+ * @param deserializeArrowAsync
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
this.deserializeArrowAsync = deserializeArrowAsync;
return this;
}
- public Builder setUseFlightSql(Boolean useFlightSql) {
- this.useFlightSql = useFlightSql;
+ /**
+ * Whether to use the legacy source api
+ *
+ * @param useOldApi
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseOldApi(Boolean useOldApi) {
+ this.useOldApi = useOldApi;
return this;
}
- public Builder setUseOldApi(Boolean useOldApi) {
- this.useOldApi = useOldApi;
+ /**
+ * Whether to use arrow flight sql for query, only supports Doris2.1 and above
+ *
+ * @param useFlightSql
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseFlightSql(Boolean useFlightSql) {
+ this.useFlightSql = useFlightSql;
return this;
}
+ /**
+ * Sets the flight sql port for DorisReadOptions.
+ *
+ * @param flightSqlPort
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFlightSqlPort(Integer flightSqlPort) {
this.flightSqlPort = flightSqlPort;
return this;
}
+ /**
+ * Build the {@link DorisReadOptions}.
+ *
+ * @return a DorisReadOptions with the settings made for this builder.
+ */
public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index fd61d7fd9..d8e0d8277 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -161,26 +161,56 @@ public static class Builder {
private DorisExecutionOptions dorisExecutionOptions;
private DorisRecordSerializer serializer;
+ /**
+ * Sets the DorisOptions for the DorisSink.
+ *
+ * @param dorisOptions the common options of the doris cluster.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSink.
+ *
+ * @param dorisReadOptions the read options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisReadOptions(DorisReadOptions dorisReadOptions) {
this.dorisReadOptions = dorisReadOptions;
return this;
}
+ /**
+ * Sets the DorisExecutionOptions for the DorisSink.
+ *
+ * @param dorisExecutionOptions the execution options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisExecutionOptions(DorisExecutionOptions dorisExecutionOptions) {
this.dorisExecutionOptions = dorisExecutionOptions;
return this;
}
+ /**
+ * Sets the {@link DorisRecordSerializer serializer} that transforms incoming records to
+ * DorisRecord
+ *
+ * @param serializer
+ * @return this DorisSink.Builder.
+ */
public Builder setSerializer(DorisRecordSerializer serializer) {
this.serializer = serializer;
return this;
}
+ /**
+ * Build the {@link DorisSink}.
+ *
+ * @return a DorisSink with the settings made for this builder.
+ */
public DorisSink build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 1b05453ad..5cdd406ce 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -27,7 +27,6 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -65,18 +64,14 @@ public class DorisSource
private final Boundedness boundedness;
private final DorisDeserializationSchema deserializer;
- private final List resolvedFilterQuery;
-
public DorisSource(
DorisOptions options,
DorisReadOptions readOptions,
Boundedness boundedness,
- List resolvedFilterQuery,
DorisDeserializationSchema deserializer) {
this.options = options;
this.readOptions = readOptions;
this.boundedness = boundedness;
- this.resolvedFilterQuery = resolvedFilterQuery;
this.deserializer = deserializer;
}
@@ -100,15 +95,6 @@ public SourceReader createReader(SourceReaderContext read
public SplitEnumerator createEnumerator(
SplitEnumeratorContext context) throws Exception {
List dorisSourceSplits = new ArrayList<>();
- if (!resolvedFilterQuery.isEmpty()) {
- String filterQuery = String.join(" AND ", resolvedFilterQuery);
- if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
- readOptions.setFilterQuery(filterQuery);
- } else {
- readOptions.setFilterQuery(
- String.join(" AND ", readOptions.getFilterQuery(), filterQuery));
- }
- }
List partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
@@ -162,44 +148,61 @@ public static class DorisSourceBuilder {
// Boundedness
private Boundedness boundedness;
private DorisDeserializationSchema deserializer;
- private List resolvedFilterQuery = new ArrayList<>();
DorisSourceBuilder() {
boundedness = Boundedness.BOUNDED;
}
+ /**
+ * Sets the DorisOptions for the DorisSource.
+ *
+ * @param options the common options of the doris cluster.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDorisOptions(DorisOptions options) {
this.options = options;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSource.
+ *
+ * @param readOptions the read options of the DorisSource.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDorisReadOptions(DorisReadOptions readOptions) {
this.readOptions = readOptions;
return this;
}
+ /** Sets the Boundedness for the DorisSource, Currently only BOUNDED is supported. */
public DorisSourceBuilder setBoundedness(Boundedness boundedness) {
this.boundedness = boundedness;
return this;
}
+ /**
+ * Sets the {@link DorisDeserializationSchema deserializer} of the Record for DorisSource.
+ *
+ * @param deserializer the deserializer for Doris Record.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDeserializer(
DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
return this;
}
- public DorisSourceBuilder setResolvedFilterQuery(List resolvedFilterQuery) {
- this.resolvedFilterQuery = resolvedFilterQuery;
- return this;
- }
-
+ /**
+ * Build the {@link DorisSource}.
+ *
+ * @return a DorisSource with the settings made for this builder.
+ */
public DorisSource build() {
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();
}
- return new DorisSource<>(
- options, readOptions, boundedness, resolvedFilterQuery, deserializer);
+ return new DorisSource<>(options, readOptions, boundedness, deserializer);
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index e55d3631b..9763a888a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,6 +90,11 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+ String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ }
+
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
@@ -123,7 +128,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
DorisSource.builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
- .setResolvedFilterQuery(resolvedFilterQuery)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType) physicalRowDataType.getLogicalType()))
@@ -166,9 +170,14 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
@Override
public DynamicTableSource copy() {
+ // filterQuery/readFields of readOption may be overwritten in union all sql
DorisDynamicTableSource newSource =
new DorisDynamicTableSource(
- options, readOptions, lookupOptions, physicalSchema, physicalRowDataType);
+ options,
+ readOptions.copy(),
+ lookupOptions,
+ physicalSchema,
+ physicalRowDataType);
newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery);
return newSource;
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 4fb6fba8f..6f1483018 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -63,6 +63,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
static final String TABLE_CSV_TM = "tbl_csv_tm_source";
+ private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
+ "tbl_read_tbl_push_down_with_union_all_not_eq_filter";
@Rule
public final MiniClusterWithClientResource miniClusterResource =
@@ -353,6 +355,51 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray());
}
+ @Test
+ public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception {
+ LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case.");
+ initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_filter_with_union_all ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ String querySql =
+ " SELECT * FROM doris_source_filter_with_union_all where name = 'doris'"
+ + " UNION ALL "
+ + "SELECT * FROM doris_source_filter_with_union_all where name in ('error','flink')";
+ TableResult tableResult = tEnv.executeSql(querySql);
+
+ List actual = new ArrayList<>();
+ try (CloseableIterator iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+
+ String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
+ checkResultInAnyOrder(
+ "testTableSourceFilterWithUnionAllNotEqualFilter", expected, actual.toArray());
+ }
+
@Test
public void testJobManagerFailoverSource() throws Exception {
LOG.info("start to test JobManagerFailoverSource.");