From c834721bb1c77ea6e04de0d0e4c021db3ba22d4d Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Sat, 24 Sep 2022 17:21:19 +0800 Subject: [PATCH] [fix](new-scan)Fix new scanner load job bugs (#12903) Fix bugs: 1. Fe need to send file format (e.g. parquet, orc ...) to be while processing load jobs using new scanner. 2. Try to get parquet file column type from SchemaElement.type before getting from Logical type and Converted type. --- .../vec/exec/format/parquet/schema_desc.cpp | 30 +++++++++++++++++-- be/src/vec/exec/format/parquet/schema_desc.h | 2 ++ .../planner/external/LoadScanProvider.java | 1 + 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index b0d449f604a5a0..04ed26af7cad59 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -152,11 +152,37 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic physical_field->physical_type = physical_schema.type; _physical_fields.push_back(physical_field); physical_field->physical_column_index = _physical_fields.size() - 1; + physical_field->type = get_doris_type(physical_schema); +} + +TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& physical_schema) { + TypeDescriptor type; + switch (physical_schema.type) { + case tparquet::Type::BOOLEAN: + type.type = TYPE_BOOLEAN; + return type; + case tparquet::Type::INT32: + type.type = TYPE_INT; + return type; + case tparquet::Type::INT64: + case tparquet::Type::INT96: + type.type = TYPE_BIGINT; + return type; + case tparquet::Type::FLOAT: + type.type = TYPE_FLOAT; + return type; + case tparquet::Type::DOUBLE: + type.type = TYPE_DOUBLE; + return type; + default: + break; + } if (physical_schema.__isset.logicalType) { - physical_field->type = convert_to_doris_type(physical_schema.logicalType); + type = convert_to_doris_type(physical_schema.logicalType); } else if (physical_schema.__isset.converted_type) { - physical_field->type = convert_to_doris_type(physical_schema.converted_type); + type = convert_to_doris_type(physical_schema.converted_type); } + return type; } TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) { diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 12db2b70117414..7f69cc6559235a 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -82,6 +82,8 @@ class FieldDescriptor { TypeDescriptor convert_to_doris_type(tparquet::ConvertedType::type convertedType); + TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema); + public: FieldDescriptor() = default; ~FieldDescriptor() = default; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 5f791186a2fd65..33b0db2de75b3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -93,6 +93,7 @@ public ParamCreateContext createContext(Analyzer analyzer) throws UserException ctx.timezone = analyzer.getTimezone(); TFileScanRangeParams params = new TFileScanRangeParams(); + params.format_type = formatType(fileGroupInfo.getFileGroup().getFileFormat(), ""); params.setStrictMode(fileGroupInfo.isStrictMode()); params.setProperties(fileGroupInfo.getBrokerDesc().getProperties()); if (fileGroupInfo.getBrokerDesc().getFileType() == TFileType.FILE_HDFS) {