diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 375e43359..c5614c31a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -65,6 +66,8 @@ import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; @@ -98,6 +101,7 @@ public class DorisBatchStreamLoad implements Serializable { private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; private boolean enableGroupCommit; + private boolean enableGzCompress; public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -128,6 +132,7 @@ public DorisBatchStreamLoad( && !loadProps .getProperty(GROUP_COMMIT) .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); + this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -285,6 +290,10 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { .addHiddenColumns(executionOptions.getDeletable()) .addProperties(executionOptions.getStreamLoadProp()); + if (enableGzCompress) { + putBuilder.setEntity(new GzipCompressingEntity(entity)); + } + Throwable resEx = new Throwable(); int retry = 0; while (retry <= executionOptions.getMaxRetries()) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 4cbcb431c..060bccb5f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -56,6 +57,8 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE; +import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; @@ -90,6 +93,7 @@ public class DorisStreamLoad implements Serializable { private boolean loadBatchFirstRecord; private volatile String currentLabel; private boolean enableGroupCommit; + private boolean enableGzCompress; public DorisStreamLoad( String hostPort, @@ -137,6 +141,8 @@ public DorisStreamLoad( && !streamLoadProp .getProperty(GROUP_COMMIT) .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); + this.enableGzCompress = + streamLoadProp.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ); loadBatchFirstRecord = true; } @@ -319,6 +325,10 @@ public void startLoad(String label, boolean isResume) throws IOException { putBuilder.enable2PC(); } + if (enableGzCompress) { + putBuilder.setEntity(new GzipCompressingEntity(entity)); + } + String executeMessage; if (enableGroupCommit) { executeMessage = "table " + table + " start execute load with group commit"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index e8cd87e64..1e026977a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -33,4 +33,6 @@ public class LoadConstants { public static final String READ_JSON_BY_LINE = "read_json_by_line"; public static final String GROUP_COMMIT = "group_commit"; public static final String GROUP_COMMIT_OFF_MODE = "off_mode"; + public static final String COMPRESS_TYPE = "compress_type"; + public static final String COMPRESS_TYPE_GZ = "gz"; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 91077ea2e..aa3d00dae 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -59,6 +59,7 @@ public class DorisSinkITCase extends DorisTestBase { static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl"; static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS"; static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; + static final String TABLE_GZ_FORMAT = "tbl_gz_format"; static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; @@ -315,6 +316,48 @@ public void testTableGroupCommit() throws Exception { checkResult(expected, query, 2); } + @Test + public void testTableGzFormat() throws Exception { + initializeTable(TABLE_GZ_FORMAT); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_gz_format_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'," + + " 'sink.properties.column_separator' = '\\x01'," + + " 'sink.properties.line_delimiter' = '\\x02'," + + " 'sink.properties.compress_type' = 'gz'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_GZ_FORMAT, + USERNAME, + PASSWORD); + tEnv.executeSql(sinkDDL); + tEnv.executeSql( + "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(25000); + List expected = Arrays.asList("doris,1", "flink,2"); + String query = + String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT); + // + checkResult(expected, query, 2); + } + @Test public void testJobManagerFailoverSink() throws Exception { initializeFailoverTable(TABLE_CSV_JM);