Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix reader should starts with prefix #46060

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,15 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
std::unique_ptr<ColumnIterator>(inner_iter));
return Status::OK();
}
// Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b
// Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b.
// then we must read the sparse columns
bool prefix_existed_in_sparse_column =
_statistics &&
(_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) !=
_statistics->sparse_column_non_null_size.end());
_statistics->sparse_column_non_null_size.end()) &&
_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path())
->first.starts_with(relative_path.get_path() + ".");

// Otherwise the prefix is not exist and the sparse column size is reached limit
// which means the path maybe exist in sparse_column
bool exceeded_sparse_column_limit =
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ class HierarchicalDataReader : public ColumnIterator {

_rows_read += nrows;
variant.finalize();
RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, nrows));
#ifndef NDEBUG
variant.check_consistency();
#endif
RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, nrows));

return Status::OK();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ class MultiBlockMerger {

for (int idx = 0; idx < columns; idx++) {
auto column = finalized_block.get_by_position(idx).column->assume_mutable();

for (int j = 0; j < limit; j++) {
auto row_ref = pushed_row_refs[i + j];
column->insert_from(*row_ref.get_column(idx), row_ref.position);
Expand Down
34 changes: 14 additions & 20 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@ void get_field_info(const Field& field, FieldInfo* info) {
}

#ifdef NDEBUG
#define ENABLE_CHECK_CONSISTENCY /* Nothing */
#define ENABLE_CHECK_CONSISTENCY (void) /* Nothing */
#else
#define ENABLE_CHECK_CONSISTENCY(this) this->check_consistency()
#define ENABLE_CHECK_CONSISTENCY(this) (this)->check_consistency()
#endif

// current nested level is 2, inside column object
Expand Down Expand Up @@ -642,7 +642,7 @@ MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const {
auto sparse_column = func(serialized_sparse_column);
res->serialized_sparse_column = sparse_column->assume_mutable();
res->num_rows = res->serialized_sparse_column->size();
res->check_consistency();
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}

Expand Down Expand Up @@ -837,9 +837,7 @@ void ColumnObject::check_consistency() const {
}

size_t ColumnObject::size() const {
#ifndef NDEBUG
check_consistency();
#endif
ENABLE_CHECK_CONSISTENCY(this);
return num_rows;
}

Expand Down Expand Up @@ -875,6 +873,10 @@ void ColumnObject::for_each_subcolumn(ColumnCallback callback) {
callback(part);
}
}
callback(serialized_sparse_column);
// callback may be filter, so the row count may be changed
num_rows = serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(this);
}

void ColumnObject::insert_from(const IColumn& src, size_t n) {
Expand All @@ -896,6 +898,7 @@ void ColumnObject::try_insert(const Field& field) {
if (field.get_type() != Field::Types::VariantMap) {
if (field.is_null()) {
insert_default();
ENABLE_CHECK_CONSISTENCY(this);
return;
}
auto* root = get_subcolumn({});
Expand Down Expand Up @@ -1267,10 +1270,8 @@ bool ColumnObject::try_add_new_subcolumn(const PathInData& path) {

void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) {
const auto& src_object = assert_cast<const ColumnObject&>(src);
#ifndef NDEBUG
check_consistency();
src_object.check_consistency();
#endif
ENABLE_CHECK_CONSISTENCY(&src_object);
ENABLE_CHECK_CONSISTENCY(this);

// First, insert src subcolumns
// We can reach the limit of subcolumns, and in this case
Expand Down Expand Up @@ -1981,6 +1982,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
std::swap(subcolumns, new_subcolumns);
doc_structure = nullptr;
_prev_positions.clear();
ENABLE_CHECK_CONSISTENCY(this);
return Status::OK();
}

Expand Down Expand Up @@ -2022,7 +2024,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
}
if (subcolumns.empty()) {
auto res = ColumnObject::create(count_bytes_in_filter(filter));
res->check_consistency();
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}
auto new_column = ColumnObject::create(true, false);
Expand All @@ -2032,7 +2034,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
entry->data.get_least_common_type());
}
new_column->serialized_sparse_column = serialized_sparse_column->filter(filter, count);
new_column->check_consistency();
ENABLE_CHECK_CONSISTENCY(new_column.get());
return new_column;
}

Expand Down Expand Up @@ -2065,14 +2067,6 @@ size_t ColumnObject::filter(const Filter& filter) {
}
}
});
const auto result_size = serialized_sparse_column->filter(filter);
if (result_size != count) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"result_size not euqal with filter_size, result_size={}, "
"filter_size={}",
result_size, count);
}
CHECK_EQ(result_size, count);
}
num_rows = count;
ENABLE_CHECK_CONSISTENCY(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,27 @@
-- !select_all --
3 1234 \N ddddd 1 \N

-- !sql --
\N
\N
\N
\N

-- !sql --
3
3
3
3

-- !sql --
2
2
2
2

-- !sql --
1
1
1
1

Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1", "disable_auto_compaction" = "true", "bloom_filter_columns" = "v", "variant_enable_flatten_nested" = "true");
properties("replication_num" = "1", "disable_auto_compaction" = "true", "bloom_filter_columns" = "v", "variant_enable_flatten_nested" = "false");
"""

// 2015
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ suite("regression_test_variant_schema_change", "variant_type"){
sql """INSERT INTO ${table_name} SELECT k, v,v from ${table_name} limit 1111"""
// select from mv
qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order by k desc limit 10"""
qt_sql """select * from ${table_name} order by k desc limit 10"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
qt_select_5_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42004;"""
qt_select_6_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42005;"""
qt_select_all """SELECT k, v['a'], v['b'], v['xxxx'], v['point'], v['ddddd'] from ${tableName} where (cast(v['point'] as int) = 1);"""

sql "truncate table ${tableName}"
sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')"""
sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')"""
sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')"""
sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')"""
qt_sql """select v['aaaa'] from ${tableName}"""
qt_sql """select v['aaa'] from ${tableName}"""
qt_sql """select v['aa'] from ${tableName}"""
qt_sql """select v['1'] from ${tableName}"""
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
GetDebugPoint().disableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats")
Expand Down
Loading