Skip to content

Commit

Permalink
[fix](variant) fix schema change for variant from not null to null
Browse files Browse the repository at this point in the history
1. cast bettween not null and null in cast_column
2. hierarchical_data_reader should fill null map when not null -> null in linked schema change
  • Loading branch information
eldenmoon committed Jan 2, 2025
1 parent 8cb4830 commit 5804a3d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
7 changes: 7 additions & 0 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ class HierarchicalDataReader : public ColumnIterator {
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
.clear_subcolumns_data();
} else {
if (dst->is_nullable()) {
// No nullable info exist in hirearchical data, fill nullmap with all none null
ColumnUInt8& dst_null_map =
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
auto fake_nullable_column = ColumnUInt8::create(nrows, 0);
dst_null_map.insert_range_from(*fake_nullable_column, 0, nrows);
}
ColumnObject& root_column = assert_cast<ColumnObject&>(*_root_reader->column);
root_column.clear_subcolumns_data();
}
Expand Down
22 changes: 14 additions & 8 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,17 @@ bool is_conversion_required_between_integers(const TypeIndex& lhs, const TypeInd

Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, ColumnPtr* result) {
ColumnsWithTypeAndName arguments {arg, {nullptr, type, type->get_name()}};
auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type);
if (!function) {
return Status::InternalError("Not found cast function {} to {}", arg.type->get_name(),
type->get_name());
}
Block tmp_block {arguments};
size_t result_column = tmp_block.columns();
auto ctx = FunctionContext::create_context(nullptr, {}, {});

// To prevent from null info lost, we should not call function since the function framework will wrap
// nullable to Variant instead of the root of Variant
// correct output: Nullable(Array(int)) -> Nullable(Variant(Nullable(Array(int))))
// incorrect output: Nullable(Array(int)) -> Nullable(Variant(Array(int)))
if (WhichDataType(remove_nullable(type)).is_variant_type()) {
// If source column is variant, so the nullable info is different from dst column
if (WhichDataType(remove_nullable(arg.type)).is_variant_type()) {
*result = type->is_nullable() ? make_nullable(arg.column) : remove_nullable(arg.column);
return Status::OK();
}
// set variant root column/type to from column/type
auto variant = ColumnObject::create(true /*always nullable*/);
CHECK(arg.column->is_nullable());
Expand All @@ -174,6 +171,15 @@ Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, Co
return Status::OK();
}

auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type);
if (!function) {
return Status::InternalError("Not found cast function {} to {}", arg.type->get_name(),
type->get_name());
}
Block tmp_block {arguments};
size_t result_column = tmp_block.columns();
auto ctx = FunctionContext::create_context(nullptr, {}, {});

if (WhichDataType(arg.type).is_nothing()) {
// cast from nothing to any type should result in nulls
*result = type->create_column_const_with_default_value(arg.column->size())
Expand Down
10 changes: 10 additions & 0 deletions regression-test/data/variant_p0/schema_change/schema_change.out
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,13 @@
1 hello world
1 hello world

-- !sql --
1 {"a":1.0}
2 {"a":111.1111}
3 {"a":"11111"}

-- !sql --
1 {"a":1.0}
2 {"a":111.1111}
3 {"a":"11111"}

Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,34 @@ suite("regression_test_variant_schema_change", "variant_type"){
sql """INSERT INTO ${table_name} SELECT k, v,v from ${table_name} limit 1111"""
// select from mv
qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order by k desc limit 10"""

// not null to null
sql "drop table if exists t"
sql """
create table t (
col0 int not null,
col1 variant NOT NULL
) UNIQUE KEY(`col0`)
DISTRIBUTED BY HASH(col0) BUCKETS 1 PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "false");
"""

sql """insert into t values (1, '{"a" : 1.0}')"""
sql """insert into t values (2, '{"a" : 111.1111}')"""
sql """insert into t values (3, '{"a" : "11111"}')"""
sql """insert into t values (4, '{"a" : 1111111111}')"""
sql """insert into t values (5, '{"a" : 1111.11111}')"""
sql """insert into t values (6, '{"a" : "11111"}')"""
sql """insert into t values (7, '{"a" : 11111.11111}')"""
sql "alter table t modify column col1 variant;"
wait_for_latest_op_on_table_finish("t", timeout)
qt_sql "select * from t order by col0 limit 3"
sql """insert into t values (1, '{"a" : 1.0}')"""
sql """insert into t values (2, '{"a" : 111.1111}')"""
sql """insert into t values (3, '{"a" : "11111"}')"""
sql """insert into t values (4, '{"a" : 1111111111}')"""
sql """insert into t values (5, '{"a" : 1111.11111}')"""
sql """insert into t values (6, '{"a" : "11111"}')"""
sql """insert into t values (7, '{"a" : 11111.11111}')"""
trigger_and_wait_compaction("t", "cumulative")
qt_sql "select * from t order by col0 limit 3"
}

0 comments on commit 5804a3d

Please sign in to comment.