Skip to content

Commit

Permalink
[feature]add arrow type for streamload (apache#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi authored Dec 27, 2023
1 parent f658c61 commit bad24da
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 50 deletions.
23 changes: 22 additions & 1 deletion .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,26 @@ jobs:
run: |
cd flink-doris-connector && mvn clean package \
-Dflink.version=1.15.0 \
-Dflink.minor.version=1.15
-Dflink.minor.version=1.15 \
-Dflink.python.id=flink-python_2.12
- name: Build flink connector 1.16
run: |
cd flink-doris-connector && mvn clean package \
-Dflink.version=1.16.0 \
-Dflink.minor.version=1.16 \
-Dflink.python.id=flink-python
- name: Build flink connector 1.17
run: |
cd flink-doris-connector && mvn clean package \
-Dflink.version=1.17.0 \
-Dflink.minor.version=1.17 \
-Dflink.python.id=flink-python
- name: Build flink connector 1.18
run: |
cd flink-doris-connector && mvn clean package \
-Dflink.version=1.18.0 \
-Dflink.minor.version=1.18 \
-Dflink.python.id=flink-python
4 changes: 3 additions & 1 deletion flink-doris-connector/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ selectFlink() {
FLINK_VERSION=0
selectFlink
flinkVer=$?
FLINK_PYTHON_ID="flink-python"
if [ ${flinkVer} -eq 1 ]; then
FLINK_VERSION="1.15.0"
FLINK_PYTHON_ID="flink-python_2.12"
elif [ ${flinkVer} -eq 2 ]; then
FLINK_VERSION="1.16.0"
elif [ ${flinkVer} -eq 3 ]; then
Expand All @@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0
echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}"
echo_g " build starting..."

${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"
${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} -Dflink.python.id=${FLINK_PYTHON_ID} "$@"

EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
Expand Down
21 changes: 9 additions & 12 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ under the License.
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
Expand All @@ -84,7 +85,6 @@ under the License.
<spotless.version>2.4.2</spotless.version>
<httpcomponents.version>4.5.13</httpcomponents.version>
<commons-codec.version>1.15</commons-codec.version>
<netty.version>4.1.77.Final</netty.version>
<fasterxml.version>2.13.3</fasterxml.version>
<guava.version>31.1-jre</guava.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down Expand Up @@ -137,6 +137,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.python.id}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
Expand Down Expand Up @@ -195,19 +202,9 @@ under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void insert(byte[] record) {
ensureCapacity(record.length);
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else {
} else if (lineDelimiter != null) {
this.buffer.put(this.lineDelimiter);
}
this.buffer.put(record);
Expand All @@ -67,7 +67,7 @@ public void insert(byte[] record) {

@VisibleForTesting
public void ensureCapacity(int length) {
int lineDelimiterSize = this.lineDelimiter.length;
int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length;
if (buffer.remaining() - lineDelimiterSize >= length) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@

import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -105,10 +108,15 @@ public DorisBatchStreamLoad(
this.password = dorisOptions.getPassword();
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
this.lineDelimiter = null;
} else {
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ public DorisBatchWriter(
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
serializer.initial();
}

public void initializeLoad() throws IOException {
this.batchStreamLoad =
new DorisBatchStreamLoad(
dorisOptions, dorisReadOptions, executionOptions, labelGenerator);
// when uploading data in streaming mode,
// we need to regularly detect whether there areexceptions.
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
Expand All @@ -101,13 +102,24 @@ private void intervalFlush() {
@Override
public void write(IN in, Context context) throws IOException, InterruptedException {
checkFlushException();
String db = this.database;
String tbl = this.table;
DorisRecord record = serializer.serialize(in);
writeOneDorisRecord(serializer.serialize(in));
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
writeOneDorisRecord(serializer.flush());
LOG.info("checkpoint flush triggered.");
batchStreamLoad.flush(null, true);
}

public void writeOneDorisRecord(DorisRecord record) throws InterruptedException {
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}
String db = this.database;
String tbl = this.table;
// multi table load
if (record.getTableIdentifier() != null) {
db = record.getDatabase();
Expand All @@ -116,13 +128,6 @@ public void write(IN in, Context context) throws IOException, InterruptedExcepti
batchStreamLoad.writeRecord(db, tbl, record.getRow());
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
LOG.info("checkpoint flush triggered.");
batchStreamLoad.flush(null, true);
}

@Override
public void close() throws Exception {
LOG.info("DorisBatchWriter Close");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -115,11 +118,15 @@ public DorisStreamLoad(
executionOptions.getBufferSize(),
executionOptions.getBufferCount(),
executionOptions.isUseCache());
lineDelimiter =
EscapeHandler.escapeString(
streamLoadProp.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
lineDelimiter = null;
} else {
lineDelimiter =
EscapeHandler.escapeString(
streamLoadProp.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -157,8 +164,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
// TODO: According to label abort txn. Currently,
// it can only be aborted based on txnid,
// TODO: According to label abort txn. Currently, it can only be aborted based on
// txnid,
// so we must first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
Expand Down Expand Up @@ -218,7 +225,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else {
} else if (lineDelimiter != null) {
recordStream.write(lineDelimiter);
}
recordStream.write(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public DorisWriter(
this.globalLoading = false;

initializeLoad(state);
serializer.initial();
}

public void initializeLoad(Collection<DorisWriterState> state) {
Expand All @@ -123,8 +124,8 @@ public void initializeLoad(Collection<DorisWriterState> state) {
}
// get main work thread.
executorThread = Thread.currentThread();
// when uploading data in streaming mode,
// we need to regularly detect whether there are exceptions.
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -167,14 +168,23 @@ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredSt
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
String tableKey = dorisOptions.getTableIdentifier();
writeOneDorisRecord(serializer.serialize(in));
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
writeOneDorisRecord(serializer.flush());
}

public void writeOneDorisRecord(DorisRecord record) throws IOException {

DorisRecord record = serializer.serialize(in);
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}

// multi table load
String tableKey = dorisOptions.getTableIdentifier();
if (record.getTableIdentifier() != null) {
tableKey = record.getTableIdentifier();
}
Expand All @@ -191,11 +201,6 @@ public void write(IN in, Context context) throws IOException {
streamLoader.writeRecord(record.getRow());
}

@Override
public void flush(boolean flush) throws IOException, InterruptedException {
// No action is triggered, everything is in the precommit method
}

@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
// Verify whether data is written during a checkpoint
Expand Down Expand Up @@ -369,5 +374,6 @@ public void close() throws Exception {
dorisStreamLoad.close();
}
}
serializer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class LoadConstants {
public static final String FORMAT_KEY = "format";
public static final String JSON = "json";
public static final String CSV = "csv";
public static final String ARROW = "arrow";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.Serializable;

public class DorisRecord implements Serializable {

public static DorisRecord empty = new DorisRecord();

private String database;
private String table;
private byte[] row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public interface DorisRecordSerializer<T> extends Serializable {
* @throws IOException
*/
DorisRecord serialize(T record) throws IOException;

default void initial() {}

default DorisRecord flush() {
return DorisRecord.empty;
}

default void close() throws Exception {}
}
Loading

0 comments on commit bad24da

Please sign in to comment.