Skip to content

Commit

Permalink
Merge branch 'master' into schema-change-abstact
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 22, 2023
2 parents d3217fb + 1b90428 commit 11e7547
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 14 deletions.
17 changes: 11 additions & 6 deletions flink-doris-connector/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fi

selectFlink() {
echo 'Flink-Doris-Connector supports multiple versions of flink. Which version do you need ?'
select flink in "1.15.x" "1.16.x" "1.17.x"
select flink in "1.15.x" "1.16.x" "1.17.x" "1.18.x"
do
case $flink in
"1.15.x")
Expand All @@ -128,6 +128,9 @@ selectFlink() {
"1.17.x")
return 3
;;
"1.18.x")
return 4
;;
*)
echo "invalid selected, exit.."
exit 1
Expand All @@ -145,17 +148,19 @@ elif [ ${flinkVer} -eq 2 ]; then
FLINK_VERSION="1.16.0"
elif [ ${flinkVer} -eq 3 ]; then
FLINK_VERSION="1.17.0"
elif [ ${flinkVer} -eq 4 ]; then
FLINK_VERSION="1.18.0"
fi

# extract minor version:
# extract major version:
# eg: 3.1.2 -> 3
FLINK_MINOR_VERSION=0
[ ${FLINK_VERSION} != 0 ] && FLINK_MINOR_VERSION=${FLINK_VERSION%.*}
FLINK_MAJOR_VERSION=0
[ ${FLINK_VERSION} != 0 ] && FLINK_MAJOR_VERSION=${FLINK_VERSION%.*}

echo_g " flink version: ${FLINK_VERSION}, minor version: ${FLINK_MINOR_VERSION}"
echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}"
echo_g " build starting..."

${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.minor.version=${FLINK_MINOR_VERSION} "$@"
${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"

EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
Expand Down
40 changes: 35 additions & 5 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<version>23</version>
</parent>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>${revision}</version>
<name>Flink Doris Connector</name>
<url>https://doris.apache.org/</url>
Expand Down Expand Up @@ -68,8 +68,8 @@ under the License.

<properties>
<revision>1.5.0-SNAPSHOT</revision>
<flink.version>1.16.0</flink.version>
<flink.minor.version>1.16</flink.minor.version>
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.sql.cdc.version>2.4.1</flink.sql.cdc.version>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
Expand Down Expand Up @@ -214,6 +214,12 @@ under the License.
<version>2.13.3</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!--Test-->
<dependency>
<groupId>org.hamcrest</groupId>
Expand Down Expand Up @@ -254,24 +260,48 @@ under the License.
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -321,8 +351,8 @@ under the License.
<shadedPattern>org.apache.doris.shaded.org.apache.commons.codec</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>org.apache.doris.shaded.com.google.flatbuffers</shadedPattern>
<pattern>com.google</pattern>
<shadedPattern>org.apache.doris.shaded.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.thrift</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.io.Serializable;
import java.util.Properties;

import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;

/**
* Doris sink batch options.
*/
Expand Down Expand Up @@ -297,6 +301,12 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
}

public DorisExecutionOptions build() {
//If format=json is set but read_json_by_line is not set, record may not be written.
if(streamLoadProp != null
&& streamLoadProp.containsKey(FORMAT_KEY)
&& JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))){
streamLoadProp.put(READ_JSON_BY_LINE, true);
}
return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public Builder setAutoRedirect(boolean autoRedirect) {

public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
//multi table load, don't need check
//checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ public class LoadConstants {
public static final String CSV = "csv";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.doris.flink.lookup.DorisLookupReader;
import org.apache.doris.flink.lookup.LookupSchema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
Expand Down

0 comments on commit 11e7547

Please sign in to comment.