Skip to content

Commit

Permalink
support gz compress
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jul 18, 2024
1 parent 8ba89c4 commit e79f3e8
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -65,6 +66,8 @@
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
Expand Down Expand Up @@ -98,6 +101,7 @@ public class DorisBatchStreamLoad implements Serializable {
private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch();
private BackendUtil backendUtil;
private boolean enableGroupCommit;
private boolean enableGzCompress;

public DorisBatchStreamLoad(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -128,6 +132,7 @@ public DorisBatchStreamLoad(
&& !loadProps
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
Expand Down Expand Up @@ -285,6 +290,10 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
.addHiddenColumns(executionOptions.getDeletable())
.addProperties(executionOptions.getStreamLoadProp());

if (enableGzCompress) {
putBuilder.setEntity(new GzipCompressingEntity(entity));
}

Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand All @@ -56,6 +57,8 @@
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
Expand Down Expand Up @@ -90,6 +93,7 @@ public class DorisStreamLoad implements Serializable {
private boolean loadBatchFirstRecord;
private volatile String currentLabel;
private boolean enableGroupCommit;
private boolean enableGzCompress;

public DorisStreamLoad(
String hostPort,
Expand Down Expand Up @@ -137,6 +141,8 @@ public DorisStreamLoad(
&& !streamLoadProp
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.enableGzCompress =
streamLoadProp.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -319,6 +325,10 @@ public void startLoad(String label, boolean isResume) throws IOException {
putBuilder.enable2PC();
}

if (enableGzCompress) {
putBuilder.setEntity(new GzipCompressingEntity(entity));
}

String executeMessage;
if (enableGroupCommit) {
executeMessage = "table " + table + " start execute load with group commit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ public class LoadConstants {
public static final String READ_JSON_BY_LINE = "read_json_by_line";
public static final String GROUP_COMMIT = "group_commit";
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
public static final String COMPRESS_TYPE = "compress_type";
public static final String COMPRESS_TYPE_GZ = "gz";
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class DorisSinkITCase extends DorisTestBase {
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";

Expand Down Expand Up @@ -315,6 +316,48 @@ public void testTableGroupCommit() throws Exception {
checkResult(expected, query, 2);
}

@Test
public void testTableGzFormat() throws Exception {
initializeTable(TABLE_GZ_FORMAT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sinkDDL =
String.format(
"CREATE TABLE doris_gz_format_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
+ "',"
+ " 'sink.properties.column_separator' = '\\x01',"
+ " 'sink.properties.line_delimiter' = '\\x02',"
+ " 'sink.properties.compress_type' = 'gz'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GZ_FORMAT,
USERNAME,
PASSWORD);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(
"INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all SELECT 'flink',2");

Thread.sleep(25000);
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT);
//
checkResult(expected, query, 2);
}

@Test
public void testJobManagerFailoverSink() throws Exception {
initializeFailoverTable(TABLE_CSV_JM);
Expand Down

0 comments on commit e79f3e8

Please sign in to comment.