Skip to content

Commit

Permalink
[Fix](ORC) Not push down fixed char type in orc reader (apache#45484)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
Problem Summary:
In Hive, the ORC file format supports fixed-length CHAR types (CHAR(n))
by padding strings with spaces to ensure the fixed length. When data is
written into ORC tables, the actual stored value includes additional
trailing spaces to meet the defined length. These padded spaces are also
considered during the computation of statistics.

However, in Doris, fixed-length CHAR types (CHAR(n)) and variable-length
VARCHAR types are internally represented as the same type. Doris does
not pad CHAR values with spaces and treats them as regular strings. As a
result, when Doris reads ORC files generated by Hive and parses the
statistics, the differences in the handling of CHAR types between the
two systems can lead to inconsistencies or incorrect statistics.
```sql
create table fixed_char_table (
  i int,
  c char(2)
) stored as orc;

insert into fixed_char_table values(1,'a'),(2,'b '), (3,'cd');
select * from fixed_char_table where c = 'a';
```
before
```text
empty
```
after
```text
1	a
```

If a Hive table undergoes a schema change, such as a column’s type being
modified from INT to STRING, predicate pushdown should be disabled in
such cases. Performing predicate pushdown under these circumstances may
lead to incorrect filtering, as the type mismatch can cause errors or
unexpected behavior during query execution.
```sql
create table type_changed_table (
  id int,
  name string 
) stored as orc;
insert into type_changed_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');
ALTER TABLE type_changed_table CHANGE COLUMN id id STRING;
select * from type_changed_table where id = '1';
select
```
before
```text
empty
```
after
```text
1	a
```
### Release note
[fix](orc) Not push down fixed char type in orc reader apache#45484
  • Loading branch information
suxiaogang223 authored Dec 23, 2024
1 parent 4fc9f92 commit f01f759
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 20 deletions.
24 changes: 16 additions & 8 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
bool enable_lazy_mat, std::vector<orc::TypeKind>* unsupported_pushdown_types)
bool enable_lazy_mat)
: _profile(profile),
_state(state),
_scan_params(params),
Expand All @@ -156,8 +156,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
_dict_cols_has_converted(false),
_unsupported_pushdown_types(unsupported_pushdown_types) {
_dict_cols_has_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
VecDateTimeValue t;
t.from_unixtime(0, ctz);
Expand Down Expand Up @@ -460,7 +459,8 @@ static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PRE
{orc::TypeKind::DOUBLE, orc::PredicateDataType::FLOAT},
{orc::TypeKind::STRING, orc::PredicateDataType::STRING},
{orc::TypeKind::BINARY, orc::PredicateDataType::STRING},
{orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
// {orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
{orc::TypeKind::VARCHAR, orc::PredicateDataType::STRING},
{orc::TypeKind::DATE, orc::PredicateDataType::DATE},
{orc::TypeKind::DECIMAL, orc::PredicateDataType::DECIMAL},
Expand Down Expand Up @@ -492,8 +492,9 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
[[fallthrough]];
case orc::TypeKind::BINARY:
[[fallthrough]];
case orc::TypeKind::CHAR:
[[fallthrough]];
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
// case orc::TypeKind::CHAR:
// [[fallthrough]];
case orc::TypeKind::VARCHAR: {
return std::make_tuple(true, orc::Literal(literal_data.data, literal_data.size));
}
Expand Down Expand Up @@ -593,7 +594,15 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
auto literal_data = literal->get_column_ptr()->get_data_at(0);
auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
auto slot_type = slot->type();
switch (slot_type.type) {
auto primitive_type = slot_type.type;
auto src_type = OrcReader::convert_to_doris_type(orc_type).type;
// should not down predicate for string type change from other type
if (src_type != primitive_type && !is_string_type(src_type) && is_string_type(primitive_type)) {
LOG(WARNING) << "Unsupported Push Down Schema Changed Column " << primitive_type << " to "
<< src_type;
return std::make_tuple(false, orc::Literal(false), orc::PredicateDataType::LONG);
}
switch (primitive_type) {
#define M(NAME) \
case TYPE_##NAME: { \
auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>( \
Expand All @@ -606,7 +615,6 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
M(INT) \
M(BIGINT) \
M(LARGEINT) \
M(CHAR) \
M(DATE) \
M(DATETIME) \
M(DATEV2) \
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ class OrcReader : public GenericReader {

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size, const std::string& ctz,
io::IOContext* io_ctx, bool enable_lazy_mat = true,
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr);
io::IOContext* io_ctx, bool enable_lazy_mat = true);

OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);
Expand Down Expand Up @@ -639,7 +638,6 @@ class OrcReader : public GenericReader {
std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
bool _dict_cols_has_converted = false;
bool _has_complex_type = false;
std::vector<orc::TypeKind>* _unsupported_pushdown_types;

// resolve schema change
std::unordered_map<std::string, std::unique_ptr<converter::ColumnTypeConverter>> _converters;
Expand Down
10 changes: 1 addition & 9 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,17 +879,9 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_ORC: {
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
static std::vector<orc::TypeKind> paimon_unsupport_type =
std::vector<orc::TypeKind> {orc::TypeKind::CHAR};
unsupported_pushdown_types = &paimon_unsupport_type;
}
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE DATABASE IF NOT EXISTS multi_catalog;
USE multi_catalog;

create table fixed_char_table (
i int,
c char(2)
) stored as orc;

insert into fixed_char_table values(1,'a'),(2,'b '), (3,'cd');

create table type_changed_table (
id int,
name string
) stored as orc;
insert into type_changed_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');
ALTER TABLE type_changed_table CHANGE COLUMN id id STRING;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
set -x

CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"

# create table
hive -f "${CUR_DIR}"/orc_predicate_table.hql


Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !predicate_fixed_char1 --
1 a

-- !predicate_fixed_char2 --

-- !predicate_changed_type1 --
1 Alice

-- !predicate_changed_type2 --
2 Bob

-- !predicate_changed_type3 --
3 Charlie

-- !predicate_fixed_char1 --
1 a

-- !predicate_fixed_char2 --

-- !predicate_changed_type1 --
1 Alice

-- !predicate_changed_type2 --
2 Bob

-- !predicate_changed_type3 --
3 Charlie

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_orc_predicate", "p0,external,hive,external_docker,external_docker_hive") {

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return;
}

for (String hivePrefix : ["hive2", "hive3"]) {
try {
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String catalog_name = "${hivePrefix}_test_predicate"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

sql """drop catalog if exists ${catalog_name}"""
sql """create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
);"""
sql """use `${catalog_name}`.`multi_catalog`"""

qt_predicate_fixed_char1 """ select * from fixed_char_table where c = 'a';"""
qt_predicate_fixed_char2 """ select * from fixed_char_table where c = 'a ';"""

qt_predicate_changed_type1 """ select * from type_changed_table where id = '1';"""
qt_predicate_changed_type2 """ select * from type_changed_table where id = '2';"""
qt_predicate_changed_type3 """ select * from type_changed_table where id = '3';"""

sql """drop catalog if exists ${catalog_name}"""
} finally {
}
}
}

0 comments on commit f01f759

Please sign in to comment.