Skip to content

Commit

Permalink
[Fix](Variant) fix some nested explode_variant_array bug and add more…
Browse files Browse the repository at this point in the history
… test (#44533)
  • Loading branch information
eldenmoon authored Dec 2, 2024
1 parent 7d7f7fe commit 1abfd10
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 14 deletions.
40 changes: 34 additions & 6 deletions be/src/vec/exprs/table_function/vexplode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

#include "common/status.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nothing.h"
#include "vec/columns/column_object.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nothing.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
Expand All @@ -37,6 +42,34 @@ VExplodeTableFunction::VExplodeTableFunction() {
_fn_name = "vexplode";
}

Status VExplodeTableFunction::_process_init_variant(Block* block, int value_column_idx) {
// explode variant array
const auto& variant_column = check_and_get_column<ColumnObject>(
remove_nullable(block->get_by_position(value_column_idx)
.column->convert_to_full_column_if_const())
.get());
_detail.output_as_variant = true;
if (!variant_column->is_null_root()) {
_array_column = variant_column->get_root();
// We need to wrap the output nested column within a variant column.
// Otherwise the type is missmatched
const auto* array_type = check_and_get_data_type<DataTypeArray>(
remove_nullable(variant_column->get_root_type()).get());
if (array_type == nullptr) {
return Status::NotSupported("explode not support none array type {}",
variant_column->get_root_type()->get_name());
}
_detail.nested_type = array_type->get_nested_type();
} else {
// null root, use nothing type
_array_column = ColumnNullable::create(ColumnArray::create(ColumnNothing::create(0)),
ColumnUInt8::create(0));
_array_column->assume_mutable()->insert_many_defaults(variant_column->size());
_detail.nested_type = std::make_shared<DataTypeNothing>();
}
return Status::OK();
}

Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
CHECK(_expr_context->root()->children().size() == 1)
<< "VExplodeTableFunction only support 1 child but has "
Expand All @@ -47,12 +80,7 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
&value_column_idx));
if (WhichDataType(remove_nullable(block->get_by_position(value_column_idx).type))
.is_variant_type()) {
// explode variant array
const auto& variant_column = check_and_get_column<ColumnObject>(
remove_nullable(block->get_by_position(value_column_idx)
.column->convert_to_full_column_if_const())
.get());
_array_column = variant_column->get_root();
RETURN_IF_ERROR(_process_init_variant(block, value_column_idx));
} else {
_array_column =
block->get_by_position(value_column_idx).column->convert_to_full_column_if_const();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/table_function/vexplode.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class VExplodeTableFunction : public TableFunction {
int get_value(MutableColumnPtr& column, int max_step) override;

private:
Status _process_init_variant(Block* block, int value_column_idx);
ColumnPtr _array_column;
ColumnArrayExecutionData _detail;
size_t _array_offset; // start offset of array[row_idx]
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/functions/array/function_array_contains_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ class FunctionArrayContainsAll : public IFunction {
is_equal_value = false;
} else {
// all is not null, check the data is equal
const auto* left_column = assert_cast<const T*>(left_data.nested_col);
const auto* right_column = assert_cast<const T*>(right_data.nested_col);
const auto* left_column = assert_cast<const T*>(left_data.nested_col.get());
const auto* right_column =
assert_cast<const T*>(right_data.nested_col.get());
auto res = left_column->compare_at(left_nested_loop_pos, right_pos,
*right_column, -1);
is_equal_value = (res == 0);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/array/function_array_distance.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class FunctionArrayDistance : public IFunction {

const auto& offsets1 = *arr1.offsets_ptr;
const auto& offsets2 = *arr2.offsets_ptr;
const auto& nested_col1 = assert_cast<const ColumnFloat64*>(arr1.nested_col);
const auto& nested_col2 = assert_cast<const ColumnFloat64*>(arr2.nested_col);
const auto& nested_col1 = assert_cast<const ColumnFloat64*>(arr1.nested_col.get());
const auto& nested_col2 = assert_cast<const ColumnFloat64*>(arr2.nested_col.get());
for (ssize_t row = 0; row < offsets1.size(); ++row) {
if (arr1.array_nullmap_data && arr1.array_nullmap_data[row]) {
dst_null_data[row] = true;
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/functions/array/function_array_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type.h"

namespace doris::vectorized {

Expand All @@ -45,12 +47,19 @@ bool extract_column_array_info(const IColumn& src, ColumnArrayExecutionData& dat

// extract array offsets and nested column
data.offsets_ptr = &data.array_col->get_offsets();
data.nested_col = &data.array_col->get_data();
data.nested_col = data.array_col->get_data_ptr();
// extract nested column is nullable
if (data.nested_col->is_nullable()) {
const auto& nested_null_col = reinterpret_cast<const ColumnNullable&>(*data.nested_col);
data.nested_nullmap_data = nested_null_col.get_null_map_data().data();
data.nested_col = nested_null_col.get_nested_column_ptr().get();
data.nested_col = nested_null_col.get_nested_column_ptr();
}
if (data.output_as_variant &&
!WhichDataType(remove_nullable(data.nested_type)).is_variant_type()) {
// set variant root column/type to from column/type
auto variant = ColumnObject::create(true /*always nullable*/);
variant->create_root(data.nested_type, make_nullable(data.nested_col)->assume_mutable());
data.nested_col = variant->get_ptr();
}
return true;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/functions/array/function_array_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
#pragma once

#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
Expand Down Expand Up @@ -55,7 +56,10 @@ struct ColumnArrayExecutionData {
const ColumnArray* array_col = nullptr;
const ColumnArray::Offsets64* offsets_ptr = nullptr;
const UInt8* nested_nullmap_data = nullptr;
const IColumn* nested_col = nullptr;
ColumnPtr nested_col = nullptr;
DataTypePtr nested_type = nullptr;
// wrap the nested column as variant column
bool output_as_variant = false;

ColumnArrayMutableData to_mutable_data() const {
ColumnArrayMutableData dst;
Expand Down
20 changes: 20 additions & 0 deletions regression-test/data/variant_p0/nested.out
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,23 @@ v.xx tinyint Yes false \N NONE
1 {"callLimit":3,"number":"02124713252","type":"HOME"}
1 {"callLimit":5,"number":"5550219210","type":"GSM"}

-- !sql --
2 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
4 {"nested":[{"baaa":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
5 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
6 {"nested":[{"mmm":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
7 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
8 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
9 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
11 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
12 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
13 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}

-- !explode_sql --

-- !explode_sql --
19 10

-- !explode_sql --
2 10

53 changes: 52 additions & 1 deletion regression-test/suites/variant_p0/nested.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,64 @@ parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
sql """insert into var_nested2 select * from var_nested order by k limit 1024"""
qt_sql """select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_result_sink=false,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=false,partitioned_hash_join_rows_threshold=1048576,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadcast_join=false,enable_two_phase_read_opt=true,enable_common_expr_pushdown_for_inverted_index=true,enable_delete_sub_predicate_v2=true,min_revocable_mem=33554432,fetch_remote_schema_timeout_seconds=120,max_fetch_remote_schema_tablet_count=512,enable_join_spill=false,enable_sort_spill=false,enable_agg_spill=false,enable_force_spill=false,data_queue_max_blocks=1,spill_streaming_agg_mem_limit=268435456,external_agg_partition_bits=5) */ * from var_nested2 order by k limit 10;"""
qt_sql """select v['nested'] from var_nested2 where k < 10 order by k limit 10;"""
// explode variant array
// 0. nomal explode variant array
order_qt_explode_sql """select count(),cast(vv['xx'] as int) from var_nested lateral view explode_variant_array(v['nested']) tmp as vv where vv['xx'] = 10 group by cast(vv['xx'] as int)"""
sql """truncate table var_nested2"""
sql """insert into var_nested2 values(1119111, '{"eventId":1,"firstName":"Name1","lastName":"Surname1","body":{"phoneNumbers":[{"number":"5550219210","type":"GSM","callLimit":5},{"number":"02124713252","type":"HOME","callLimit":3},{"number":"05550219211","callLimit":2,"type":"WORK"}]}}
')"""
order_qt_explode_sql """select v['eventId'], phone_numbers from var_nested2 lateral view explode_variant_array(v['body']['phoneNumbers']) tmp1 as phone_numbers
where phone_numbers['type'] = 'GSM' OR phone_numbers['type'] = 'HOME' and phone_numbers['callLimit'] > 2;"""

// test array_function
sql "DROP TABLE IF EXISTS var_nested_array_agg"
sql """
CREATE TABLE IF NOT EXISTS var_nested_array_agg(
k bigint,
v variant
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false", "enable_unique_key_merge_on_write" = "true", "variant_enable_flatten_nested" = "true");
"""
sql "insert into var_nested_array_agg select * from var_nested"
// 1. array_contains
qt_sql "select * from var_nested_array_agg where array_contains(cast(v['nested']['xx'] as array<int>), 10) order by k limit 10"
// 2. array_agg scalar
sql "select k, array_agg(cast(v['nested'] as text)) from var_nested_array_agg group by k limit 10"

// test explode_variant_array with abonomal case
sql "DROP TABLE IF EXISTS var_nested_explode_variant_with_abnomal"
sql """
CREATE TABLE IF NOT EXISTS var_nested_explode_variant_with_abnomal(
k bigint,
v variant
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false", "enable_unique_key_merge_on_write" = "true", "variant_enable_flatten_nested" = "true");
"""
sql "insert into var_nested_explode_variant_with_abnomal select * from var_nested"
// 1. v['nested']['x'] is null root
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['nested']['x']) tmp as vv where vv = 10 group by cast(vv as int)"""
// 2. v['nested']['xx'] is normal array
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['nested']['xx']) tmp as vv where vv = 10 group by cast(vv as int)"""
// 3. v['xx'] is none array scalar type
test {
sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['xx']) tmp as vv where vv = 10 group by cast(vv as int)"""
exception("explode not support none array type")
}
// 4. v['k1'] is json scalar type
test {
sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['k1']) tmp as vv where vv = 10 group by cast(vv as int)"""
exception("explode not support none array type")
}
// 5. toplevel nested array
sql "truncate table var_nested_explode_variant_with_abnomal"
sql """insert into var_nested_explode_variant_with_abnomal values(1, '[{"a" : 10}, {"b" : "20", "c" :1024, "a" : 11}]')"""
sql """insert into var_nested_explode_variant_with_abnomal values(2, '[{"a" : 10}, {"b" : "20", "a" : 150}]')"""
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['a']) tmp as vv where vv = 10 group by cast(vv as int)"""
// FIXME after refator
// order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v) tmp as vv where vv['a'] = 10 group by cast(vv as int)"""
} finally {
// reset flags
}
Expand Down

0 comments on commit 1abfd10

Please sign in to comment.