From 8095a67cb4bf8ec3f4fd54d8368f848423deec4b Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Wed, 22 Nov 2023 15:38:33 +0800
Subject: [PATCH 1/2] [improve] change option default when format is json
(#236)
If format=json is set but read_json_by_line is not set, record may not be written.
---
.../apache/doris/flink/cfg/DorisExecutionOptions.java | 10 ++++++++++
.../java/org/apache/doris/flink/cfg/DorisOptions.java | 3 ++-
.../apache/doris/flink/sink/writer/LoadConstants.java | 1 +
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 4a03024dd..9a43d77ff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -22,6 +22,10 @@
import java.io.Serializable;
import java.util.Properties;
+import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
+import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;
+
/**
* Doris sink batch options.
*/
@@ -297,6 +301,12 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
}
public DorisExecutionOptions build() {
+ //If format=json is set but read_json_by_line is not set, record may not be written.
+ if(streamLoadProp != null
+ && streamLoadProp.containsKey(FORMAT_KEY)
+ && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))){
+ streamLoadProp.put(READ_JSON_BY_LINE, true);
+ }
return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index 6391e9142..cb7565f39 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -132,7 +132,8 @@ public Builder setAutoRedirect(boolean autoRedirect) {
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
- checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
+ //multi table load, don't need check
+ //checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index e6566e39a..d38f9602d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -31,5 +31,6 @@ public class LoadConstants {
public static final String CSV = "csv";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
+ public static final String READ_JSON_BY_LINE = "read_json_by_line";
}
From 1b90428ea8bc0768e9059c721b85748962de5008 Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Wed, 22 Nov 2023 15:46:26 +0800
Subject: [PATCH 2/2] [Feature] Support Flink1.18 (#235)
---
flink-doris-connector/build.sh | 17 +++++---
flink-doris-connector/pom.xml | 40 ++++++++++++++++---
.../table/DorisRowDataJdbcLookupFunction.java | 4 +-
3 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh
index 5816e86eb..a646f5859 100755
--- a/flink-doris-connector/build.sh
+++ b/flink-doris-connector/build.sh
@@ -116,7 +116,7 @@ fi
selectFlink() {
echo 'Flink-Doris-Connector supports multiple versions of flink. Which version do you need ?'
- select flink in "1.15.x" "1.16.x" "1.17.x"
+ select flink in "1.15.x" "1.16.x" "1.17.x" "1.18.x"
do
case $flink in
"1.15.x")
@@ -128,6 +128,9 @@ selectFlink() {
"1.17.x")
return 3
;;
+ "1.18.x")
+ return 4
+ ;;
*)
echo "invalid selected, exit.."
exit 1
@@ -145,17 +148,19 @@ elif [ ${flinkVer} -eq 2 ]; then
FLINK_VERSION="1.16.0"
elif [ ${flinkVer} -eq 3 ]; then
FLINK_VERSION="1.17.0"
+elif [ ${flinkVer} -eq 4 ]; then
+ FLINK_VERSION="1.18.0"
fi
-# extract minor version:
+# extract major version:
# eg: 3.1.2 -> 3
-FLINK_MINOR_VERSION=0
-[ ${FLINK_VERSION} != 0 ] && FLINK_MINOR_VERSION=${FLINK_VERSION%.*}
+FLINK_MAJOR_VERSION=0
+[ ${FLINK_VERSION} != 0 ] && FLINK_MAJOR_VERSION=${FLINK_VERSION%.*}
-echo_g " flink version: ${FLINK_VERSION}, minor version: ${FLINK_MINOR_VERSION}"
+echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}"
echo_g " build starting..."
-${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.minor.version=${FLINK_MINOR_VERSION} "$@"
+${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index c96a83389..13a7ec471 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -26,7 +26,7 @@ under the License.
23
org.apache.doris
- flink-doris-connector-${flink.minor.version}
+ flink-doris-connector-${flink.major.version}
${revision}
Flink Doris Connector
https://doris.apache.org/
@@ -68,8 +68,8 @@ under the License.
1.5.0-SNAPSHOT
- 1.16.0
- 1.16
+ 1.18.0
+ 1.18
2.4.1
0.16.0
5.0.0
@@ -214,6 +214,12 @@ under the License.
2.13.3
+
+ com.google.guava
+ guava
+ 31.1-jre
+
+
org.hamcrest
@@ -254,24 +260,48 @@ under the License.
flink-sql-connector-mysql-cdc
${flink.sql.cdc.version}
provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
com.ververica
flink-sql-connector-oracle-cdc
${flink.sql.cdc.version}
provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
com.ververica
flink-sql-connector-postgres-cdc
${flink.sql.cdc.version}
provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
com.ververica
flink-sql-connector-sqlserver-cdc
${flink.sql.cdc.version}
provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
org.apache.flink
@@ -321,8 +351,8 @@ under the License.
org.apache.doris.shaded.org.apache.commons.codec
- com.google.flatbuffers
- org.apache.doris.shaded.com.google.flatbuffers
+ com.google
+ org.apache.doris.shaded.com.google
org.apache.thrift
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 00a6c77e3..bb4d6715a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -23,8 +23,8 @@
import org.apache.doris.flink.lookup.DorisLookupReader;
import org.apache.doris.flink.lookup.LookupSchema;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;