Skip to content

Commit

Permalink
[opt](serde)Optimize the filling of fixed values ​​into block columns…
Browse files Browse the repository at this point in the history
… without repeated deserialization. (apache#37377)

## Proposed changes

Since the value of the partition column is fixed when querying the
partition table, we can deserialize the value only once and then
repeatedly insert the value into the block.
```sql
in Hive: 
CREATE TABLE parquet_partition_tb (
    col1 STRING,
    col2 INT,
    col3 DOUBLE
) PARTITIONED BY (
    partition_col1 STRING,
    partition_col2 INT
)
STORED AS PARQUET;

insert into  parquet_partition_tb partition (partition_col1="hello",partition_col2=1) values("word",2,2.3);

insert into parquet_partition_tb partition(partition_col1="hello",partition_col2=1 )  
select col1,col2,col3 from  parquet_partition_tb where partition_col1="hello" and partition_col2=1;
Repeat the `insert into xxx select  xxx`operation several times.


Doris :
before:
mysql>  select count(partition_col1) from parquet_partition_tb;
+-----------------------+
| count(partition_col1) |
+-----------------------+
|              33554432 |
+-----------------------+
1 row in set (3.24 sec)

mysql>  select count(partition_col2) from parquet_partition_tb;
+-----------------------+
| count(partition_col2) |
+-----------------------+
|              33554432 |
+-----------------------+
1 row in set (3.34 sec)


after:
mysql>  select count(partition_col1) from parquet_partition_tb ;
+-----------------------+
| count(partition_col1) |
+-----------------------+
|              33554432 |
+-----------------------+
1 row in set (0.79 sec)

mysql> select count(partition_col2) from parquet_partition_tb;
+-----------------------+
| count(partition_col2) |
+-----------------------+
|              33554432 |
+-----------------------+
1 row in set (0.51 sec)

```
## Summary:
test sql `select count(partition_col) from tbl;`
Number of lines : 33554432
| |before | after|
|---|---|--|
|boolean |  3.96|0.47  | 
|tinyint  |  3.39|0.47  |  
|smallint |  3.14|0.50   |
|int    |3.34|0.51   | 
|bigint  |   3.61|0.51  |
|float   | 4.59 |0.51  | 
|double   |4.60| 0.55  | 
|decimal(5,2)|  3.96  |0.61 | 
|date   | 5.80|0.52    | 
|timestamp |  7.68 | 0.52 | 
|string  |  3.24 |0.79   | 

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
hubgeter authored Jul 9, 2024
1 parent 0833dbe commit b48b5da
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 18 deletions.
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,25 @@ Status DataTypeDateTimeV2SerDe::write_column_to_orc(const std::string& timezone,
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeDateTimeV2SerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void DataTypeDateTimeV2SerDe::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<UInt64>&>(column);
auto sz = col.size();
UInt64 val = col.get_element(sz - 1);
col.insert_many_vals(val, times);
}

} // namespace doris::vectorized
5 changes: 5 additions & 0 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;
void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,26 @@ Status DataTypeDateV2SerDe::write_column_to_orc(const std::string& timezone, con
return Status::OK();
}

Status DataTypeDateV2SerDe::deserialize_column_from_fixed_json(IColumn& column, Slice& slice,
int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}
DataTypeDateV2SerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void DataTypeDateV2SerDe::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<UInt32>&>(column);
auto sz = col.size();
UInt32 val = col.get_element(sz - 1);

col.insert_many_vals(val, times);
}

} // namespace vectorized
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_datev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
26 changes: 26 additions & 0 deletions be/src/vec/data_types/serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,32 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const std::string& timezone,
}
return Status::OK();
}
template <typename T>

Status DataTypeDecimalSerDe<T>::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeDecimalSerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

template <typename T>
void DataTypeDecimalSerDe<T>::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnDecimal<T>&>(column);
auto sz = col.size();

T val = col.get_element(sz - 1);
for (int i = 0; i < times; i++) {
col.insert_value(val);
}
}

template class DataTypeDecimalSerDe<Decimal32>;
template class DataTypeDecimalSerDe<Decimal64>;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_decimal_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class DataTypeDecimalSerDe : public DataTypeSerDe {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
20 changes: 20 additions & 0 deletions be/src/vec/data_types/serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ Status DataTypeNullableSerDe::deserialize_column_from_hive_text_vector(
return Status::OK();
}

Status DataTypeNullableSerDe::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
auto& col = static_cast<ColumnNullable&>(column);
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}
auto& null_map = col.get_null_map_data();
auto& nested_column = col.get_nested_column();

null_map.resize_fill(
rows, null_map.back()); // data_type_nullable::insert_column_last_value_multiple_times()
if (rows - 1 != 0) {
nested_serde->insert_column_last_value_multiple_times(nested_column, rows - 1);
}
*num_deserialized = rows;
return Status::OK();
}

Status DataTypeNullableSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
auto& null_column = assert_cast<ColumnNullable&>(column);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_nullable_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class DataTypeNullableSerDe : public DataTypeSerDe {
int* num_deserialized,
const FormatOptions& options) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;
Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override;
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
const auto* raw_data = reinterpret_cast<const T*>(buffer->data()) + start;
col_data.insert(raw_data, raw_data + row_count);
}
template <typename T>
Status DataTypeNumberSerDe<T>::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeNumberSerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

template <typename T>
void DataTypeNumberSerDe<T>::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<T>&>(column);
auto sz = col.size();
T val = col.get_element(sz - 1);
col.insert_many_vals(val, times);
}

template <typename T>
template <bool is_binary_format>
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_number_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class DataTypeNumberSerDe : public DataTypeSerDe {
int* num_deserialized,
const FormatOptions& options) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

Status write_column_to_pb(const IColumn& column, PValues& result, int start,
int end) const override;
Status read_column_from_pb(IColumn& column, const PValues& arg) const override;
Expand Down
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,27 @@ class DataTypeSerDe {
virtual Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const = 0;
// deserialize fixed values.Repeatedly insert the value row times into the column.
virtual Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
*num_deserialized = 0;
return st;
}
insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}
// Insert the last value to the end of this column multiple times.
virtual void insert_column_last_value_multiple_times(IColumn& column, int times) const {
//If you try to simplify this operation by using `column.insert_many_from(column, column.size() - 1, rows - 1);`
// you are likely to get incorrect data results.
MutableColumnPtr dum_col = column.clone_empty();
dum_col->insert_from(column, column.size() - 1);
column.insert_many_from(*dum_col.get(), 0, times);
}

virtual Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
Expand Down
25 changes: 25 additions & 0 deletions be/src/vec/data_types/serde/data_type_string_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
}
return Status::OK();
}

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeStringSerDeBase::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void insert_column_last_value_multiple_times(IColumn& column, int times) const override {
auto& col = static_cast<ColumnString&>(column);
auto sz = col.size();

StringRef ref = col.get_data_at(sz - 1);
String str(ref.data, ref.size);
std::vector<StringRef> refs(times, {str.data(), str.size()});

col.insert_many_strings(refs.data(), refs.size());
}

Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
auto& column_dest = assert_cast<ColumnType&>(column);
column_dest.reserve(column_dest.size() + arg.string_value_size());
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,10 @@ Status OrcReader::_fill_partition_columns(
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,13 +631,10 @@ Status RowGroupReader::_fill_partition_columns(
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,10 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) {
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down

0 comments on commit b48b5da

Please sign in to comment.