Skip to content

Commit

Permalink
Merge branch 'master' into 20240620_arrow_flight_regressiontest
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jun 26, 2024
2 parents 22fb9da + 4fe2648 commit 8e900be
Show file tree
Hide file tree
Showing 17 changed files with 576 additions and 113 deletions.
7 changes: 7 additions & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

#include <gen_cpp/PlanNodes_types.h>
#include <stdint.h>

#include "operator.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_writer.h"
Expand Down Expand Up @@ -49,6 +52,7 @@ struct ResultFileOptions {
//Now the code version is 1.1.2, so when the version is after 1.2, could remove this code.
bool is_refactor_before_flag = false;
std::string orc_schema;
TFileCompressType::type orc_compression_type;

bool delete_existing_files = false;
std::string file_suffix;
Expand Down Expand Up @@ -101,6 +105,9 @@ struct ResultFileOptions {
if (t_opt.__isset.orc_schema) {
orc_schema = t_opt.orc_schema;
}
if (t_opt.__isset.orc_compression_type) {
orc_compression_type = t_opt.orc_compression_type;
}
}
};

Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/functions/array/function_array_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ struct ArraySetImpl {
constexpr auto execute_left_column_first = Impl::Action::execute_left_column_first;
size_t current = 0;
Impl impl;
for (size_t row = 0; row < left_data.offsets_ptr->size(); ++row) {
size_t row_size = left_data.offsets_ptr->size();
if constexpr (LCONST) {
row_size = right_data.offsets_ptr->size();
}
for (size_t row = 0; row < row_size; ++row) {
size_t count = 0;
size_t left_off = (*left_data.offsets_ptr)[index_check_const(row, LCONST) - 1];
size_t left_len = (*left_data.offsets_ptr)[index_check_const(row, LCONST)] - left_off;
Expand Down
50 changes: 31 additions & 19 deletions be/src/vec/runtime/vorc_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,39 +107,27 @@ void VOrcOutputStream::set_written_len(int64_t written_len) {
}

VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::string& schema, bool output_object_data)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_write_options(new orc::WriterOptions()),
_schema_str(&schema),
_iceberg_schema(nullptr) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
}

VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const VExprContextSPtrs& output_vexpr_ctxs, std::string schema,
std::vector<std::string> column_names, bool output_object_data,
orc::CompressionKind compression,
TFileCompressType::type compress_type,
const iceberg::Schema* iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_column_names(std::move(column_names)),
_write_options(new orc::WriterOptions()),
_schema_str(nullptr),
_schema_str(std::move(schema)),
_iceberg_schema(iceberg_schema) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
_write_options->setCompression(compression);
set_compression_type(compress_type);
}

Status VOrcTransformer::open() {
if (_schema_str != nullptr) {
if (!_schema_str.empty()) {
try {
_schema = orc::Type::buildTypeFromString(*_schema_str);
_schema = orc::Type::buildTypeFromString(_schema_str);
} catch (const std::exception& e) {
return Status::InternalError("Orc build schema from \"{}\" failed: {}", *_schema_str,
return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str,
e.what());
}
} else {
Expand Down Expand Up @@ -171,6 +159,30 @@ Status VOrcTransformer::open() {
return Status::OK();
}

void VOrcTransformer::set_compression_type(const TFileCompressType::type& compress_type) {
switch (compress_type) {
case TFileCompressType::PLAIN: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_NONE);
break;
}
case TFileCompressType::SNAPPYBLOCK: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY);
break;
}
case TFileCompressType::ZLIB: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
break;
}
case TFileCompressType::ZSTD: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD);
break;
}
default: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
}
}
}

std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(
const TypeDescriptor& type_descriptor, const iceberg::NestedField* nested_field) {
std::unique_ptr<orc::Type> type;
Expand Down
11 changes: 4 additions & 7 deletions be/src/vec/runtime/vorc_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,9 @@ class VOrcOutputStream : public orc::OutputStream {
class VOrcTransformer final : public VFileFormatTransformer {
public:
VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs, const std::string& schema,
bool output_object_data);

VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const VExprContextSPtrs& output_vexpr_ctxs, std::string schema,
std::vector<std::string> column_names, bool output_object_data,
orc::CompressionKind compression,
TFileCompressType::type compression,
const iceberg::Schema* iceberg_schema = nullptr);

~VOrcTransformer() = default;
Expand All @@ -99,6 +95,7 @@ class VOrcTransformer final : public VFileFormatTransformer {
int64_t written_len() override;

private:
void set_compression_type(const TFileCompressType::type& compress_type);
std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& type_descriptor,
const iceberg::NestedField* nested_field);

Expand All @@ -113,7 +110,7 @@ class VOrcTransformer final : public VFileFormatTransformer {
std::vector<std::string> _column_names;
std::unique_ptr<orc::OutputStream> _output_stream;
std::unique_ptr<orc::WriterOptions> _write_options;
const std::string* _schema_str;
std::string _schema_str;
std::unique_ptr<orc::Type> _schema;
std::unique_ptr<orc::Writer> _writer;

Expand Down
31 changes: 16 additions & 15 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,39 +147,40 @@ void ParquetBuildHelper::build_compression_type(
const TParquetCompressionType::type& compression_type) {
switch (compression_type) {
case TParquetCompressionType::SNAPPY: {
builder.compression(parquet::Compression::SNAPPY);
builder.compression(arrow::Compression::SNAPPY);
break;
}
case TParquetCompressionType::GZIP: {
builder.compression(parquet::Compression::GZIP);
builder.compression(arrow::Compression::GZIP);
break;
}
case TParquetCompressionType::BROTLI: {
builder.compression(parquet::Compression::BROTLI);
builder.compression(arrow::Compression::BROTLI);
break;
}
case TParquetCompressionType::ZSTD: {
builder.compression(parquet::Compression::ZSTD);
builder.compression(arrow::Compression::ZSTD);
break;
}
case TParquetCompressionType::LZ4: {
builder.compression(parquet::Compression::LZ4);
break;
}
case TParquetCompressionType::LZO: {
builder.compression(parquet::Compression::LZO);
break;
}
case TParquetCompressionType::BZ2: {
builder.compression(parquet::Compression::BZ2);
builder.compression(arrow::Compression::LZ4);
break;
}
// arrow do not support lzo and bz2 compression type.
// case TParquetCompressionType::LZO: {
// builder.compression(arrow::Compression::LZO);
// break;
// }
// case TParquetCompressionType::BZ2: {
// builder.compression(arrow::Compression::BZ2);
// break;
// }
case TParquetCompressionType::UNCOMPRESSED: {
builder.compression(parquet::Compression::UNCOMPRESSED);
builder.compression(arrow::Compression::UNCOMPRESSED);
break;
}
default:
builder.compression(parquet::Compression::UNCOMPRESSED);
builder.compression(arrow::Compression::SNAPPY);
}
}

Expand Down
27 changes: 2 additions & 25 deletions be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
orc::CompressionKind orc_compression_type;
switch (_compress_type) {
case TFileCompressType::PLAIN: {
orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
break;
}
case TFileCompressType::SNAPPYBLOCK: {
orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY;
break;
}
case TFileCompressType::ZLIB: {
orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
break;
}
case TFileCompressType::ZSTD: {
orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
break;
}
default: {
return Status::InternalError("Unsupported compress type {} with orc", _compress_type);
}
}

_file_format_transformer.reset(
new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false, orc_compression_type, &_schema));
new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, "",
_write_column_names, false, _compress_type, &_schema));
return _file_format_transformer->open();
}
default: {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/sink/writer/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
_file_opts->parquet_version, _output_object_data));
break;
case TFileFormatType::FORMAT_ORC:
_vfile_writer.reset(new VOrcTransformer(_state, _file_writer_impl.get(),
_vec_output_expr_ctxs, _file_opts->orc_schema,
_output_object_data));
_vfile_writer.reset(new VOrcTransformer(
_state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, {},
_output_object_data, _file_opts->orc_compression_type));
break;
default:
return Status::InternalError("unsupported file format: {}", _file_opts->file_format);
Expand Down
27 changes: 2 additions & 25 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
orc::CompressionKind orc_compression_type;
switch (_hive_compress_type) {
case TFileCompressType::PLAIN: {
orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
break;
}
case TFileCompressType::SNAPPYBLOCK: {
orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY;
break;
}
case TFileCompressType::ZLIB: {
orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
break;
}
case TFileCompressType::ZSTD: {
orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
break;
}
default: {
return Status::InternalError("Unsupported type {} with orc", _hive_compress_type);
}
}

_file_format_transformer.reset(
new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false, orc_compression_type));
new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, "",
_write_column_names, false, _hive_compress_type));
return _file_format_transformer->open();
}
default: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ExportStmt extends StatementBase {
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
public static final String DATA_CONSISTENCY = "data_consistency";
public static final String COMPRESS_TYPE = "compress_type";

private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
Expand All @@ -81,6 +82,7 @@ public class ExportStmt extends StatementBase {
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.add(COMPRESS_TYPE)
.build();

private TableName tblName;
Expand All @@ -107,6 +109,7 @@ public class ExportStmt extends StatementBase {
private String deleteExistingFiles;
private String withBom;
private String dataConsistency = ExportJob.CONSISTENT_PARTITION;
private String compressionType;
private SessionVariable sessionVariables;

private String qualifiedUser;
Expand Down Expand Up @@ -234,6 +237,7 @@ private void setJob() throws UserException {
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
exportJob.setWithBom(this.withBom);
exportJob.setDataConsistency(this.dataConsistency);
exportJob.setCompressType(this.compressionType);

if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
Expand Down Expand Up @@ -376,6 +380,9 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
+ ExportJob.CONSISTENT_PARTITION + "`/`" + ExportJob.CONSISTENT_NONE + "`");
}
}

// compress_type
this.compressionType = properties.getOrDefault(COMPRESS_TYPE, "");
}

private void checkColumns() throws DdlException {
Expand Down
Loading

0 comments on commit 8e900be

Please sign in to comment.