From c9388452591376d76529a06fcf8ec58e0f59c01b Mon Sep 17 00:00:00 2001 From: amory Date: Tue, 25 Jun 2024 22:22:12 +0800 Subject: [PATCH 1/2] [fix](array)fix array_except/union for left const return only one row result (#36776) if left param in array_except/union is const, it will make ret column only has one row column which will make result wrong --- be/src/vec/functions/array/function_array_set.h | 6 +++++- .../array_functions/test_array_functions.out | 12 ++++++++++++ .../array_functions/test_array_functions.groovy | 3 +++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/be/src/vec/functions/array/function_array_set.h b/be/src/vec/functions/array/function_array_set.h index 41bf53f921f3fb..fe51863d9ea422 100644 --- a/be/src/vec/functions/array/function_array_set.h +++ b/be/src/vec/functions/array/function_array_set.h @@ -181,7 +181,11 @@ struct ArraySetImpl { constexpr auto execute_left_column_first = Impl::Action::execute_left_column_first; size_t current = 0; Impl impl; - for (size_t row = 0; row < left_data.offsets_ptr->size(); ++row) { + size_t row_size = left_data.offsets_ptr->size(); + if constexpr (LCONST) { + row_size = right_data.offsets_ptr->size(); + } + for (size_t row = 0; row < row_size; ++row) { size_t count = 0; size_t left_off = (*left_data.offsets_ptr)[index_check_const(row, LCONST) - 1]; size_t left_len = (*left_data.offsets_ptr)[index_check_const(row, LCONST)] - left_off; diff --git a/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out b/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out index 7654d789dd0a58..0c209f32cf55cc 100644 --- a/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out +++ b/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out @@ -1659,14 +1659,26 @@ 10005 [10005, null, null] [null, 3, 10005, 2, 1] 10006 [60002, 60002, 60003, null, 60005] [null, 3, 60002, 60005, 60003, 2, 1] +-- !select_union_left_const -- +10005 [10005, null, null] [null, 3, 10005, 2, 1] +10006 [60002, 60002, 60003, null, 60005] [null, 3, 60002, 60005, 60003, 2, 1] + -- !select_except -- 10005 [10005, null, null] [10005, null] 10006 [60002, 60002, 60003, null, 60005] [60002, 60003, null, 60005] +-- !select_except_left_const -- +10005 [10005, null, null] [1, 2, 3] +10006 [60002, 60002, 60003, null, 60005] [1, 2, 3] + -- !select_intersect -- 10005 [10005, null, null] [null] 10006 [60002, 60002, 60003, null, 60005] [null] +-- !select_intersect_left_const -- +10005 [10005, null, null] [null] +10006 [60002, 60002, 60003, null, 60005] [null] + -- !select_array_datetimev2_1 -- 1 ["2023-01-19 18:11:11.111", "2023-01-19 18:22:22.222", "2023-01-19 18:33:33.333"] ["2023-01-19 18:22:22.222", "2023-01-19 18:33:33.333", "2023-01-19 18:44:44.444"] ["2023-01-19 18:11:11.111111", "2023-01-19 18:22:22.222222", "2023-01-19 18:33:33.333333"] diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy index a4eb69cf4cead7..ac12b1ffccb437 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy @@ -272,8 +272,11 @@ suite("test_array_functions") { sql """ insert into ${tableName3} values (10006,'bbbbb',[60002,60002,60003,null,60005]) """ qt_select_union "select class_id, student_ids, array_union(student_ids,[1,2,3]) from ${tableName3} order by class_id;" + qt_select_union_left_const "select class_id, student_ids, array_union([1,2,3], student_ids,[1,2,3]) from ${tableName3} order by class_id;" qt_select_except "select class_id, student_ids, array_except(student_ids,[1,2,3]) from ${tableName3} order by class_id;" + qt_select_except_left_const "select class_id, student_ids, array_except([1,2,3], student_ids) from ${tableName3} order by class_id;" qt_select_intersect "select class_id, student_ids, array_intersect(student_ids,[1,2,3,null]) from ${tableName3} order by class_id;" + qt_select_intersect_left_const "select class_id, student_ids, array_intersect([1,2,3,null], student_ids) from ${tableName3} order by class_id;" def tableName4 = "tbl_test_array_datetimev2_functions" From 4fe264888d664df2e2f91d78b9a0f5f41c2a838c Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Tue, 25 Jun 2024 22:41:30 +0800 Subject: [PATCH 2/2] [Fix](export/outfile) Support compression when exporting data to Parquet / ORC. (#36490) When using Export/Outfile, we can use properties `compress_type` to specify the compression method. Currently, Parquet file format supports `SNAPPY`, `GZIP`, `BROTLI`, `ZSTD`, `LZ4`, `LZO`, `BZ2` and `PLAIN` compression methods, default value is `SNAPPY`. Orc file format supports `PLAIN`, `SNAPPY`, `ZLIB`, and `ZSTD` compression methods, default value is `ZLIB`. Docs: https://github.com/apache/doris-website/pull/764 --- be/src/pipeline/exec/result_sink_operator.h | 7 + be/src/vec/runtime/vorc_transformer.cpp | 50 ++-- be/src/vec/runtime/vorc_transformer.h | 11 +- be/src/vec/runtime/vparquet_transformer.cpp | 31 +-- .../iceberg/viceberg_partition_writer.cpp | 27 +- .../vec/sink/writer/vfile_result_writer.cpp | 6 +- .../sink/writer/vhive_partition_writer.cpp | 27 +- .../org/apache/doris/analysis/ExportStmt.java | 7 + .../apache/doris/analysis/OutFileClause.java | 58 ++-- .../java/org/apache/doris/load/ExportJob.java | 9 + .../trees/plans/commands/ExportCommand.java | 8 +- gensrc/thrift/DataSinks.thrift | 2 + .../test_parquet_orc_compression.out | 248 ++++++++++++++++++ .../test_parquet_orc_compression.groovy | 177 +++++++++++++ 14 files changed, 556 insertions(+), 112 deletions(-) create mode 100644 regression-test/data/export_p0/test_parquet_orc_compression.out create mode 100644 regression-test/suites/export_p0/test_parquet_orc_compression.groovy diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 1d2490f486d92f..7ec7d43ec2b03a 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -17,6 +17,9 @@ #pragma once +#include +#include + #include "operator.h" #include "runtime/buffer_control_block.h" #include "runtime/result_writer.h" @@ -49,6 +52,7 @@ struct ResultFileOptions { //Now the code version is 1.1.2, so when the version is after 1.2, could remove this code. bool is_refactor_before_flag = false; std::string orc_schema; + TFileCompressType::type orc_compression_type; bool delete_existing_files = false; std::string file_suffix; @@ -101,6 +105,9 @@ struct ResultFileOptions { if (t_opt.__isset.orc_schema) { orc_schema = t_opt.orc_schema; } + if (t_opt.__isset.orc_compression_type) { + orc_compression_type = t_opt.orc_compression_type; + } } }; diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 54c2bb59923ea5..09bae276d65d18 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -107,39 +107,27 @@ void VOrcOutputStream::set_written_len(int64_t written_len) { } VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, - const std::string& schema, bool output_object_data) - : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), - _file_writer(file_writer), - _write_options(new orc::WriterOptions()), - _schema_str(&schema), - _iceberg_schema(nullptr) { - _write_options->setTimezoneName(_state->timezone()); - _write_options->setUseTightNumericVector(true); -} - -VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, + const VExprContextSPtrs& output_vexpr_ctxs, std::string schema, std::vector column_names, bool output_object_data, - orc::CompressionKind compression, + TFileCompressType::type compress_type, const iceberg::Schema* iceberg_schema) : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), _file_writer(file_writer), _column_names(std::move(column_names)), _write_options(new orc::WriterOptions()), - _schema_str(nullptr), + _schema_str(std::move(schema)), _iceberg_schema(iceberg_schema) { _write_options->setTimezoneName(_state->timezone()); _write_options->setUseTightNumericVector(true); - _write_options->setCompression(compression); + set_compression_type(compress_type); } Status VOrcTransformer::open() { - if (_schema_str != nullptr) { + if (!_schema_str.empty()) { try { - _schema = orc::Type::buildTypeFromString(*_schema_str); + _schema = orc::Type::buildTypeFromString(_schema_str); } catch (const std::exception& e) { - return Status::InternalError("Orc build schema from \"{}\" failed: {}", *_schema_str, + return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str, e.what()); } } else { @@ -171,6 +159,30 @@ Status VOrcTransformer::open() { return Status::OK(); } +void VOrcTransformer::set_compression_type(const TFileCompressType::type& compress_type) { + switch (compress_type) { + case TFileCompressType::PLAIN: { + _write_options->setCompression(orc::CompressionKind::CompressionKind_NONE); + break; + } + case TFileCompressType::SNAPPYBLOCK: { + _write_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY); + break; + } + case TFileCompressType::ZLIB: { + _write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB); + break; + } + case TFileCompressType::ZSTD: { + _write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD); + break; + } + default: { + _write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB); + } + } +} + std::unique_ptr VOrcTransformer::_build_orc_type( const TypeDescriptor& type_descriptor, const iceberg::NestedField* nested_field) { std::unique_ptr type; diff --git a/be/src/vec/runtime/vorc_transformer.h b/be/src/vec/runtime/vorc_transformer.h index 554a1401f61866..134e949e76cabd 100644 --- a/be/src/vec/runtime/vorc_transformer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -79,13 +79,9 @@ class VOrcOutputStream : public orc::OutputStream { class VOrcTransformer final : public VFileFormatTransformer { public: VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, const std::string& schema, - bool output_object_data); - - VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, + const VExprContextSPtrs& output_vexpr_ctxs, std::string schema, std::vector column_names, bool output_object_data, - orc::CompressionKind compression, + TFileCompressType::type compression, const iceberg::Schema* iceberg_schema = nullptr); ~VOrcTransformer() = default; @@ -99,6 +95,7 @@ class VOrcTransformer final : public VFileFormatTransformer { int64_t written_len() override; private: + void set_compression_type(const TFileCompressType::type& compress_type); std::unique_ptr _build_orc_type(const TypeDescriptor& type_descriptor, const iceberg::NestedField* nested_field); @@ -113,7 +110,7 @@ class VOrcTransformer final : public VFileFormatTransformer { std::vector _column_names; std::unique_ptr _output_stream; std::unique_ptr _write_options; - const std::string* _schema_str; + std::string _schema_str; std::unique_ptr _schema; std::unique_ptr _writer; diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index fbb24e009688ae..116a898c4f1c39 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -147,39 +147,40 @@ void ParquetBuildHelper::build_compression_type( const TParquetCompressionType::type& compression_type) { switch (compression_type) { case TParquetCompressionType::SNAPPY: { - builder.compression(parquet::Compression::SNAPPY); + builder.compression(arrow::Compression::SNAPPY); break; } case TParquetCompressionType::GZIP: { - builder.compression(parquet::Compression::GZIP); + builder.compression(arrow::Compression::GZIP); break; } case TParquetCompressionType::BROTLI: { - builder.compression(parquet::Compression::BROTLI); + builder.compression(arrow::Compression::BROTLI); break; } case TParquetCompressionType::ZSTD: { - builder.compression(parquet::Compression::ZSTD); + builder.compression(arrow::Compression::ZSTD); break; } case TParquetCompressionType::LZ4: { - builder.compression(parquet::Compression::LZ4); - break; - } - case TParquetCompressionType::LZO: { - builder.compression(parquet::Compression::LZO); - break; - } - case TParquetCompressionType::BZ2: { - builder.compression(parquet::Compression::BZ2); + builder.compression(arrow::Compression::LZ4); break; } + // arrow do not support lzo and bz2 compression type. + // case TParquetCompressionType::LZO: { + // builder.compression(arrow::Compression::LZO); + // break; + // } + // case TParquetCompressionType::BZ2: { + // builder.compression(arrow::Compression::BZ2); + // break; + // } case TParquetCompressionType::UNCOMPRESSED: { - builder.compression(parquet::Compression::UNCOMPRESSED); + builder.compression(arrow::Compression::UNCOMPRESSED); break; } default: - builder.compression(parquet::Compression::UNCOMPRESSED); + builder.compression(arrow::Compression::SNAPPY); } } diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp index 59ac69d34f6848..da5758fd604873 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp @@ -87,32 +87,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil return _file_format_transformer->open(); } case TFileFormatType::FORMAT_ORC: { - orc::CompressionKind orc_compression_type; - switch (_compress_type) { - case TFileCompressType::PLAIN: { - orc_compression_type = orc::CompressionKind::CompressionKind_NONE; - break; - } - case TFileCompressType::SNAPPYBLOCK: { - orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; - break; - } - case TFileCompressType::ZLIB: { - orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; - break; - } - case TFileCompressType::ZSTD: { - orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; - break; - } - default: { - return Status::InternalError("Unsupported compress type {} with orc", _compress_type); - } - } - _file_format_transformer.reset( - new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, - _write_column_names, false, orc_compression_type, &_schema)); + new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, "", + _write_column_names, false, _compress_type, &_schema)); return _file_format_transformer->open(); } default: { diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index c897892cbfc401..ce8f2d18e075fa 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -128,9 +128,9 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { _file_opts->parquet_version, _output_object_data)); break; case TFileFormatType::FORMAT_ORC: - _vfile_writer.reset(new VOrcTransformer(_state, _file_writer_impl.get(), - _vec_output_expr_ctxs, _file_opts->orc_schema, - _output_object_data)); + _vfile_writer.reset(new VOrcTransformer( + _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, {}, + _output_object_data, _file_opts->orc_compression_type)); break; default: return Status::InternalError("unsupported file format: {}", _file_opts->file_format); diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index abf1f9007a03e8..1e8b28ef3aeb2c 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -103,32 +103,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) return _file_format_transformer->open(); } case TFileFormatType::FORMAT_ORC: { - orc::CompressionKind orc_compression_type; - switch (_hive_compress_type) { - case TFileCompressType::PLAIN: { - orc_compression_type = orc::CompressionKind::CompressionKind_NONE; - break; - } - case TFileCompressType::SNAPPYBLOCK: { - orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; - break; - } - case TFileCompressType::ZLIB: { - orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; - break; - } - case TFileCompressType::ZSTD: { - orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; - break; - } - default: { - return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); - } - } - _file_format_transformer.reset( - new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, - _write_column_names, false, orc_compression_type)); + new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, "", + _write_column_names, false, _hive_compress_type)); return _file_format_transformer->open(); } default: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 3efda3bf8f8fc5..855379cbc37e71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -64,6 +64,7 @@ public class ExportStmt extends StatementBase { public static final String PARALLELISM = "parallelism"; public static final String LABEL = "label"; public static final String DATA_CONSISTENCY = "data_consistency"; + public static final String COMPRESS_TYPE = "compress_type"; private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; @@ -81,6 +82,7 @@ public class ExportStmt extends StatementBase { .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER) .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") + .add(COMPRESS_TYPE) .build(); private TableName tblName; @@ -107,6 +109,7 @@ public class ExportStmt extends StatementBase { private String deleteExistingFiles; private String withBom; private String dataConsistency = ExportJob.CONSISTENT_PARTITION; + private String compressionType; private SessionVariable sessionVariables; private String qualifiedUser; @@ -234,6 +237,7 @@ private void setJob() throws UserException { exportJob.setDeleteExistingFiles(this.deleteExistingFiles); exportJob.setWithBom(this.withBom); exportJob.setDataConsistency(this.dataConsistency); + exportJob.setCompressType(this.compressionType); if (columns != null) { Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); @@ -376,6 +380,9 @@ private void checkProperties(Map properties) throws UserExceptio + ExportJob.CONSISTENT_PARTITION + "`/`" + ExportJob.CONSISTENT_NONE + "`"); } } + + // compress_type + this.compressionType = properties.getOrDefault(COMPRESS_TYPE, ""); } private void checkColumns() throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 3f4749ceb2d4ef..a658bec93af519 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -37,6 +37,7 @@ import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TParquetCompressionType; import org.apache.doris.thrift.TParquetDataType; @@ -70,6 +71,7 @@ public class OutFileClause { public static final Map PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_DATA_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); + public static final Map ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); public static final Set ORC_DATA_TYPE = Sets.newHashSet(); public static final String FILE_NUMBER = "FileNumber"; @@ -106,9 +108,15 @@ public class OutFileClause { PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI); PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD); PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4); - PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO); - PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2); - PARQUET_COMPRESSION_TYPE_MAP.put("default", TParquetCompressionType.UNCOMPRESSED); + // arrow do not support lzo and bz2 compression type. + // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO); + // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2); + PARQUET_COMPRESSION_TYPE_MAP.put("plain", TParquetCompressionType.UNCOMPRESSED); + + ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN); + ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK); + ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB); + ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD); PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0); PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST); @@ -137,6 +145,7 @@ public class OutFileClause { public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files"; public static final String PROP_FILE_SUFFIX = "file_suffix"; public static final String PROP_WITH_BOM = "with_bom"; + public static final String COMPRESS_TYPE = "compress_type"; private static final String PARQUET_PROP_PREFIX = "parquet."; private static final String SCHEMA = "schema"; @@ -170,8 +179,8 @@ public class OutFileClause { private boolean isAnalyzed = false; private String headerType = ""; - private static final String PARQUET_COMPRESSION = "compression"; - private TParquetCompressionType parquetCompressionType = TParquetCompressionType.UNCOMPRESSED; + private TParquetCompressionType parquetCompressionType = TParquetCompressionType.SNAPPY; + private TFileCompressType orcCompressionType = TFileCompressType.ZLIB; private static final String PARQUET_DISABLE_DICTIONARY = "disable_dictionary"; private boolean parquetDisableDictionary = false; private static final String PARQUET_VERSION = "version"; @@ -670,19 +679,11 @@ public static String getFsName(String path) { return fullPath.replace(filePath, ""); } - void setParquetCompressionType(String propertyValue) { - if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(propertyValue)) { - this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(propertyValue); - } else { - LOG.warn("not set parquet compression type or is invalid, set default to UNCOMPRESSED type."); - } - } - void setParquetVersion(String propertyValue) { if (PARQUET_VERSION_MAP.containsKey(propertyValue)) { this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue); } else { - LOG.warn("not set parquet version type or is invalid, set default to PARQUET_1.0 version."); + LOG.debug("not set parquet version type or is invalid, set default to PARQUET_1.0 version."); } } @@ -698,15 +699,25 @@ void setParquetVersion(String propertyValue) { * currently only supports: compression, disable_dictionary, version */ private void getParquetProperties(Set processedPropKeys) throws AnalysisException { + // save compress type + if (properties.containsKey(COMPRESS_TYPE)) { + if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) { + this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get( + properties.get(COMPRESS_TYPE).toLowerCase()); + processedPropKeys.add(COMPRESS_TYPE); + } else { + throw new AnalysisException("parquet compression type [" + properties.get(COMPRESS_TYPE) + + "] is invalid, please choose one among SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN"); + } + } + // save all parquet prefix property Iterator> iter = properties.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { processedPropKeys.add(entry.getKey()); - if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_COMPRESSION)) { - setParquetCompressionType(entry.getValue()); - } else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY)) { + if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY)) { this.parquetDisableDictionary = Boolean.valueOf(entry.getValue()); } else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION)) { setParquetVersion(entry.getValue()); @@ -750,6 +761,18 @@ private void getParquetProperties(Set processedPropKeys) throws Analysis } private void getOrcProperties(Set processedPropKeys) throws AnalysisException { + // get compression type + // save compress type + if (properties.containsKey(COMPRESS_TYPE)) { + if (ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) { + this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase()); + processedPropKeys.add(COMPRESS_TYPE); + } else { + throw new AnalysisException("orc compression type [" + properties.get(COMPRESS_TYPE) + "] is invalid," + + " please choose one among ZLIB, SNAPPY, ZSTD or PLAIN"); + } + } + // check schema. if schema is not set, Doris will gen schema by select items String schema = properties.get(SCHEMA); if (schema == null) { @@ -852,6 +875,7 @@ public TResultFileSinkOptions toSinkOptions() { } if (isOrcFormat()) { sinkOptions.setOrcSchema(serializeOrcSchema()); + sinkOptions.setOrcCompressionType(orcCompressionType); } return sinkOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 299100e9c318bc..2d84023938e620 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.commands.ExportCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -173,6 +174,8 @@ public class ExportJob implements Writable { private String withBom; @SerializedName("dataConsistency") private String dataConsistency; + @SerializedName("compressType") + private String compressType; private TableRef tableRef; @@ -619,6 +622,12 @@ private Map convertOutfileProperties() { if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) { outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, columnSeparator); outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, lineDelimiter); + } else { + // orc / parquet + // compressType == null means outfile will use default compression type + if (compressType != null) { + outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType); + } } if (!maxFileSize.isEmpty()) { outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index f89145ec45cdaa..263bf43e355dc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -74,6 +74,7 @@ public class ExportCommand extends Command implements ForwardWithSync { public static final String PARALLELISM = "parallelism"; public static final String LABEL = "label"; public static final String DATA_CONSISTENCY = "data_consistency"; + public static final String COMPRESS_TYPE = "compress_type"; private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; @@ -91,6 +92,7 @@ public class ExportCommand extends Command implements ForwardWithSync { .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") .add(OutFileClause.PROP_WITH_BOM) + .add(COMPRESS_TYPE) .build(); private final List nameParts; @@ -337,9 +339,13 @@ private ExportJob generateExportJob(ConnectContext ctx, Map file } catch (NumberFormatException e) { throw new UserException("The value of timeout is invalid!"); } - exportJob.setTimeoutSecond(timeoutSecond); + // set compress_type + if (fileProperties.containsKey(COMPRESS_TYPE)) { + exportJob.setCompressType(fileProperties.get(COMPRESS_TYPE)); + } + // exportJob generate outfile sql exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts)); return exportJob; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 954b240c649488..d509e9816e7cce 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -131,6 +131,8 @@ struct TResultFileSinkOptions { 16: optional bool delete_existing_files; 17: optional string file_suffix; 18: optional bool with_bom; + + 19: optional PlanNodes.TFileCompressType orc_compression_type; } struct TMemoryScratchSink { diff --git a/regression-test/data/export_p0/test_parquet_orc_compression.out b/regression-test/data/export_p0/test_parquet_orc_compression.out new file mode 100644 index 00000000000000..7d258b47e3b745 --- /dev/null +++ b/regression-test/data/export_p0/test_parquet_orc_compression.out @@ -0,0 +1,248 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_export1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + +-- !select_load1 -- +1 2017-10-01 +10 2017-10-01 +11 2017-10-01 +2 2017-10-01 +3 2017-10-01 +4 2017-10-01 +5 2017-10-01 +6 2017-10-01 +7 2017-10-01 +8 2017-10-01 +9 2017-10-01 + diff --git a/regression-test/suites/export_p0/test_parquet_orc_compression.groovy b/regression-test/suites/export_p0/test_parquet_orc_compression.groovy new file mode 100644 index 00000000000000..c3a8f3f8bbb64f --- /dev/null +++ b/regression-test/suites/export_p0/test_parquet_orc_compression.groovy @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_parquet_orc_compression", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + + def table_export_name = "test_parquet_orc_compression" + def outfile_path_prefix = """${bucket}/export/test_parquet_orc_compression/exp_""" + + sql """ DROP TABLE IF EXISTS ${table_export_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_export_name} ( + `user_id` INT NOT NULL COMMENT "用户id", + `date` DATE NOT NULL COMMENT "数据灌入日期时间" + ) + DISTRIBUTED BY HASH(user_id) + PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 11; i ++) { + sb.append(""" + (${i}, '2017-10-01'), + """) + } + sb.append(""" + (${i}, '2017-10-01') + """) + sql """ INSERT INTO ${table_export_name} VALUES + ${sb.toString()} + """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) + order_qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ + + + def waiting_export = { export_label -> + while (true) { + def res = sql """ show export where label = "${export_label}";""" + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } + } + + // export compression + def export_compression = { file_format, compression_type, tvf_read -> + def uuid = UUID.randomUUID().toString() + def outFilePath = """${outfile_path_prefix}_${uuid}""" + def label = "label_${uuid}" + try { + // exec export + sql """ + EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/" + PROPERTIES( + "label" = "${label}", + "format" = "${file_format}", + "compress_type" = "${compression_type}" + ) + WITH S3( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + if (tvf_read) { + def outfile_url = waiting_export.call(label) + + order_qt_select_load1 """ select * from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${file_format}", + "s3.access_key"= "${ak}", + "s3.secret_key" = "${sk}", + "format" = "${file_format}", + "region" = "${region}" + ) ORDER BY user_id; + """ + } + } finally { + } + } + + // outfile compression + def outfile_compression = { file_format, compression_type, tvf_read -> + def uuid = UUID.randomUUID().toString() + def outFilePath = """${outfile_path_prefix}_${uuid}""" + + def res = sql """ + SELECT * FROM ${table_export_name} t ORDER BY user_id + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${file_format} + PROPERTIES ( + "compress_type" = "${compression_type}", + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + if (tvf_read) { + def outfile_url = res[0][3] + order_qt_select_load1 """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${file_format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${file_format}", + "region" = "${region}" + ); + """ + } + } + + // 1. export + // 1.1 parquet + export_compression("parquet", "snappy", true) + export_compression("parquet", "GZIP", true) + // parquet-read do not support read BROTLI compression type now + export_compression("parquet", "BROTLI", false) + export_compression("parquet", "ZSTD", true) + export_compression("parquet", "LZ4", true) + export_compression("parquet", "plain", true) + // 1.2 orc + export_compression("orc", "PLAIN", true) + export_compression("orc", "SNAPPY", true) + export_compression("orc", "ZLIB", true) + export_compression("orc", "ZSTD", true) + + // 2. outfile + // parquet + outfile_compression("parquet", "snappy", true) + outfile_compression("parquet", "GZIP", true) + // parquet-read do not support read BROTLI compression type now + outfile_compression("parquet", "BROTLI", false) + outfile_compression("parquet", "ZSTD", true) + outfile_compression("parquet", "LZ4", true) + outfile_compression("parquet", "plain", true) + // orc + outfile_compression("orc", "PLAIN", true) + outfile_compression("orc", "SNAPPY", true) + outfile_compression("orc", "ZLIB", true) + outfile_compression("orc", "ZSTD", true) +}