Skip to content

Commit

Permalink
refactor serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 21, 2023
1 parent 5449864 commit 16d37f6
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {
public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public DorisBatchSink.Builder<IN> setSerializer(DorisRecordSerializer<IN> serial
public DorisBatchSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -92,24 +91,20 @@ private void intervalFlush() {
@Override
public void write(IN in, Context context) throws IOException, InterruptedException {
checkFlushException();
if(in instanceof RecordWithMeta){
RecordWithMeta row = (RecordWithMeta) in;
if(StringUtils.isNullOrWhitespaceOnly(row.getTable())
||StringUtils.isNullOrWhitespaceOnly(row.getDatabase())
||row.getRecord() == null){
LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
return;
}
batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(), row.getRecord().getBytes(StandardCharsets.UTF_8));
String db = this.database;
String tbl = this.table;
Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
if(rowTuple == null || rowTuple.f1 == null){
//ddl or value is null
return;
}

byte[] serialize = serializer.serialize(in);
if(Objects.isNull(serialize)){
//ddl record
return;
//multi table load
if(rowTuple.f0 != null){
String[] tableInfo = rowTuple.f0.split("\\.");
db = tableInfo[0];
tbl = tableInfo[1];
}
batchStreamLoad.writeRecord(database, table, serialize);
batchStreamLoad.writeRecord(db, tbl, rowTuple.f1);
}
@Override
public void flush(boolean flush) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.sink.writer;

import org.apache.flink.api.java.tuple.Tuple2;

import java.io.IOException;
import java.io.Serializable;

Expand All @@ -29,8 +31,9 @@ public interface DorisRecordSerializer<T> extends Serializable {
/**
* define how to convert record into byte array.
* @param record
* @return byte array
* @return [tableIdentifer,byte array]
* @throws IOException
*/
byte[] serialize(T record) throws IOException;
Tuple2<String, byte[]> serialize(T record) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
Expand All @@ -39,14 +38,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -152,12 +149,17 @@ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredSt
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
Tuple2<String, byte[]> rowTuple = serializeRecord(in);
String tableKey = rowTuple.f0;
byte[] serializeRow = rowTuple.f1;
if(serializeRow == null){
String tableKey = dorisOptions.getTableIdentifier();

Tuple2<String, byte[]> rowTuple = serializer.serialize(in);
if(rowTuple == null || rowTuple.f1 == null){
//ddl or value is null
return;
}
//multi table load
if(rowTuple.f0 != null){
tableKey = rowTuple.f0;
}

DorisStreamLoad streamLoader = getStreamLoader(tableKey);
if(!loadingMap.containsKey(tableKey)) {
Expand All @@ -168,32 +170,7 @@ public void write(IN in, Context context) throws IOException {
loadingMap.put(tableKey, true);
globalLoading = true;
}
streamLoader.writeRecord(serializeRow);
}

private Tuple2<String, byte[]> serializeRecord(IN in) throws IOException {
String tableKey = dorisOptions.getTableIdentifier();
byte[] serializeRow = null;
if(serializer != null) {
serializeRow = serializer.serialize(in);
if(Objects.isNull(serializeRow)){
//ddl record by JsonDebeziumSchemaSerializer
return Tuple2.of(tableKey, null);
}
}
//multi table load
if(in instanceof RecordWithMeta){
RecordWithMeta row = (RecordWithMeta) in;
if(StringUtils.isBlank(row.getTable())
|| StringUtils.isBlank(row.getDatabase())
|| row.getRecord() == null){
LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
return Tuple2.of(tableKey, null);
}
tableKey = row.getDatabase() + "." + row.getTable();
serializeRow = row.getRecord().getBytes(StandardCharsets.UTF_8);
}
return Tuple2.of(tableKey, serializeRow);
streamLoader.writeRecord(rowTuple.f1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -132,7 +133,7 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
}

@Override
public byte[] serialize(String record) throws IOException {
public Tuple2<String, byte[]> serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = extractJsonNode(recordRoot, "op");
Expand All @@ -157,7 +158,7 @@ public byte[] serialize(String record) throws IOException {
addDeleteSign(valueMap, false);
break;
case OP_UPDATE:
return extractUpdate(recordRoot);
return Tuple2.of(null, extractUpdate(recordRoot));
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap, true);
Expand All @@ -166,7 +167,7 @@ public byte[] serialize(String record) throws IOException {
LOG.error("parse record fail, unknown op {} in {}", op, record);
return null;
}
return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.doris.flink.sink.writer;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.flink.api.java.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class RecordWithMetaSerializer implements DorisRecordSerializer<RecordWithMeta>{
private static final Logger LOG = LoggerFactory.getLogger(RecordWithMetaSerializer.class);

@Override
public Tuple2<String, byte[]> serialize(RecordWithMeta record) throws IOException {
if(StringUtils.isBlank(record.getTable())
|| StringUtils.isBlank(record.getDatabase())
|| record.getRecord() == null){
LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}",
record.getDatabase(), record.getTable(), record.getRecord());
return null;
}
String tableKey = record.getDatabase() + "." + record.getTable();
return Tuple2.of(tableKey, record.getRecord().getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -59,7 +60,7 @@ private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type
}

@Override
public byte[] serialize(RowData record) throws IOException{
public Tuple2<String, byte[]> serialize(RowData record) throws IOException{
int maxIndex = Math.min(record.getArity(), fieldNames.length);
String valString;
if (JSON.equals(type)) {
Expand All @@ -69,7 +70,7 @@ public byte[] serialize(RowData record) throws IOException{
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
return valString.getBytes(StandardCharsets.UTF_8);
return Tuple2.of(null, valString.getBytes(StandardCharsets.UTF_8));
}

public String buildJsonString(RowData record, int maxIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.doris.flink.sink.writer;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;

Expand Down Expand Up @@ -53,7 +55,7 @@ private RowSerializer(String[] fieldNames, DataType[] dataTypes, String type, St
}

@Override
public byte[] serialize(Row record) throws IOException{
public Tuple2<String, byte[]> serialize(Row record) throws IOException{
RowData rowDataRecord = this.rowRowConverter.toInternal(record);
return this.rowDataSerializer.serialize(rowDataRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.sink.writer;

import org.apache.flink.api.java.tuple.Tuple2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

Expand All @@ -26,7 +28,7 @@
public class SimpleStringSerializer implements DorisRecordSerializer<String> {

@Override
public byte[] serialize(String record) throws IOException {
return record.getBytes(StandardCharsets.UTF_8);
public Tuple2<String, byte[]> serialize(String record) throws IOException {
return Tuple2.of(null, record.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.doris.flink.sink.writer.RecordWithMetaSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

Expand Down Expand Up @@ -67,7 +68,8 @@ public static void main(String[] args) throws Exception {

builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build());
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.doris.flink.sink.writer.RecordWithMetaSerializer;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -61,7 +62,8 @@ public static void main(String[] args) throws Exception {

builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build());
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testConvert() throws IOException {
.setFieldDelimiter("|")
.setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"})
.build();
String s = new String(serializer.serialize(rowData));
String s = new String(serializer.serialize(rowData).f1);
Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s);
}

Expand Down
Loading

0 comments on commit 16d37f6

Please sign in to comment.