From fa2ff27567c8125a52ba91defc528a2b885d9deb Mon Sep 17 00:00:00 2001 From: eldenmoon Date: Thu, 26 Dec 2024 22:37:36 +0800 Subject: [PATCH 1/2] fix reader should starts with prefix --- be/src/olap/rowset/segment_v2/column_reader.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index c3c41d968ae3d4..eea90de8da34b9 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -304,7 +304,10 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, bool prefix_existed_in_sparse_column = _statistics && (_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) != - _statistics->sparse_column_non_null_size.end()); + _statistics->sparse_column_non_null_size.end()) && + _statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) + ->first.starts_with(relative_path.get_path()); + // Otherwise the prefix is not exist and the sparse column size is reached limit // which means the path maybe exist in sparse_column bool exceeded_sparse_column_limit = From 264fdd232d61dfdad65cc5616c6ba7a9de7af510 Mon Sep 17 00:00:00 2001 From: eldenmoon Date: Mon, 30 Dec 2024 20:39:47 +0800 Subject: [PATCH 2/2] fix --- .../olap/rowset/segment_v2/column_reader.cpp | 4 +-- .../segment_v2/hierarchical_data_reader.h | 2 +- be/src/olap/schema_change.cpp | 1 - be/src/vec/columns/column_object.cpp | 34 ++++++++----------- .../compaction/compaction_sparse_column.out | 24 +++++++++++++ .../variant_github_events_p2/load.groovy | 2 +- .../schema_change/schema_change.groovy | 1 + .../compaction_sparse_column.groovy | 10 ++++++ 8 files changed, 53 insertions(+), 25 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index eea90de8da34b9..97e8b1ca1da9fa 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -299,14 +299,14 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, std::unique_ptr(inner_iter)); return Status::OK(); } - // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b + // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b. // then we must read the sparse columns bool prefix_existed_in_sparse_column = _statistics && (_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) != _statistics->sparse_column_non_null_size.end()) && _statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) - ->first.starts_with(relative_path.get_path()); + ->first.starts_with(relative_path.get_path() + "."); // Otherwise the prefix is not exist and the sparse column size is reached limit // which means the path maybe exist in sparse_column diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index 369dff92829e85..c50ac26169e2dc 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -153,10 +153,10 @@ class HierarchicalDataReader : public ColumnIterator { _rows_read += nrows; variant.finalize(); + RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, nrows)); #ifndef NDEBUG variant.check_consistency(); #endif - RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, nrows)); return Status::OK(); } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index ec291d8d2f0068..bc41365dbd4e11 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -209,7 +209,6 @@ class MultiBlockMerger { for (int idx = 0; idx < columns; idx++) { auto column = finalized_block.get_by_position(idx).column->assume_mutable(); - for (int j = 0; j < limit; j++) { auto row_ref = pushed_row_refs[i + j]; column->insert_from(*row_ref.get_column(idx), row_ref.position); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 8cd3b089f66a5a..2ce8ce791edf3b 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -352,9 +352,9 @@ void get_field_info(const Field& field, FieldInfo* info) { } #ifdef NDEBUG -#define ENABLE_CHECK_CONSISTENCY /* Nothing */ +#define ENABLE_CHECK_CONSISTENCY (void) /* Nothing */ #else -#define ENABLE_CHECK_CONSISTENCY(this) this->check_consistency() +#define ENABLE_CHECK_CONSISTENCY(this) (this)->check_consistency() #endif // current nested level is 2, inside column object @@ -642,7 +642,7 @@ MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const { auto sparse_column = func(serialized_sparse_column); res->serialized_sparse_column = sparse_column->assume_mutable(); res->num_rows = res->serialized_sparse_column->size(); - res->check_consistency(); + ENABLE_CHECK_CONSISTENCY(res.get()); return res; } @@ -837,9 +837,7 @@ void ColumnObject::check_consistency() const { } size_t ColumnObject::size() const { -#ifndef NDEBUG - check_consistency(); -#endif + ENABLE_CHECK_CONSISTENCY(this); return num_rows; } @@ -875,6 +873,10 @@ void ColumnObject::for_each_subcolumn(ColumnCallback callback) { callback(part); } } + callback(serialized_sparse_column); + // callback may be filter, so the row count may be changed + num_rows = serialized_sparse_column->size(); + ENABLE_CHECK_CONSISTENCY(this); } void ColumnObject::insert_from(const IColumn& src, size_t n) { @@ -896,6 +898,7 @@ void ColumnObject::try_insert(const Field& field) { if (field.get_type() != Field::Types::VariantMap) { if (field.is_null()) { insert_default(); + ENABLE_CHECK_CONSISTENCY(this); return; } auto* root = get_subcolumn({}); @@ -1267,10 +1270,8 @@ bool ColumnObject::try_add_new_subcolumn(const PathInData& path) { void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) { const auto& src_object = assert_cast(src); -#ifndef NDEBUG - check_consistency(); - src_object.check_consistency(); -#endif + ENABLE_CHECK_CONSISTENCY(&src_object); + ENABLE_CHECK_CONSISTENCY(this); // First, insert src subcolumns // We can reach the limit of subcolumns, and in this case @@ -1981,6 +1982,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { std::swap(subcolumns, new_subcolumns); doc_structure = nullptr; _prev_positions.clear(); + ENABLE_CHECK_CONSISTENCY(this); return Status::OK(); } @@ -2022,7 +2024,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { } if (subcolumns.empty()) { auto res = ColumnObject::create(count_bytes_in_filter(filter)); - res->check_consistency(); + ENABLE_CHECK_CONSISTENCY(res.get()); return res; } auto new_column = ColumnObject::create(true, false); @@ -2032,7 +2034,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { entry->data.get_least_common_type()); } new_column->serialized_sparse_column = serialized_sparse_column->filter(filter, count); - new_column->check_consistency(); + ENABLE_CHECK_CONSISTENCY(new_column.get()); return new_column; } @@ -2065,14 +2067,6 @@ size_t ColumnObject::filter(const Filter& filter) { } } }); - const auto result_size = serialized_sparse_column->filter(filter); - if (result_size != count) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "result_size not euqal with filter_size, result_size={}, " - "filter_size={}", - result_size, count); - } - CHECK_EQ(result_size, count); } num_rows = count; ENABLE_CHECK_CONSISTENCY(this); diff --git a/regression-test/data/variant_p1/compaction/compaction_sparse_column.out b/regression-test/data/variant_p1/compaction/compaction_sparse_column.out index 520eaf5f84ce41..b060cb9f57ca4c 100644 --- a/regression-test/data/variant_p1/compaction/compaction_sparse_column.out +++ b/regression-test/data/variant_p1/compaction/compaction_sparse_column.out @@ -95,3 +95,27 @@ -- !select_all -- 3 1234 \N ddddd 1 \N +-- !sql -- +\N +\N +\N +\N + +-- !sql -- +3 +3 +3 +3 + +-- !sql -- +2 +2 +2 +2 + +-- !sql -- +1 +1 +1 +1 + diff --git a/regression-test/suites/variant_github_events_p2/load.groovy b/regression-test/suites/variant_github_events_p2/load.groovy index d4c74474eaf96c..abcdfb689c1e5d 100644 --- a/regression-test/suites/variant_github_events_p2/load.groovy +++ b/regression-test/suites/variant_github_events_p2/load.groovy @@ -158,7 +158,7 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){ ) DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 - properties("replication_num" = "1", "disable_auto_compaction" = "true", "bloom_filter_columns" = "v", "variant_enable_flatten_nested" = "true"); + properties("replication_num" = "1", "disable_auto_compaction" = "true", "bloom_filter_columns" = "v", "variant_enable_flatten_nested" = "false"); """ // 2015 diff --git a/regression-test/suites/variant_p0/schema_change/schema_change.groovy b/regression-test/suites/variant_p0/schema_change/schema_change.groovy index 42cef32c8e5641..3c35ff28100d63 100644 --- a/regression-test/suites/variant_p0/schema_change/schema_change.groovy +++ b/regression-test/suites/variant_p0/schema_change/schema_change.groovy @@ -79,4 +79,5 @@ suite("regression_test_variant_schema_change", "variant_type"){ sql """INSERT INTO ${table_name} SELECT k, v,v from ${table_name} limit 1111""" // select from mv qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order by k desc limit 10""" + qt_sql """select * from ${table_name} order by k desc limit 10""" } \ No newline at end of file diff --git a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy index 32d16b040a3a3b..b3cca5910e8834 100644 --- a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy +++ b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy @@ -201,6 +201,16 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") { qt_select_5_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42004;""" qt_select_6_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42005;""" qt_select_all """SELECT k, v['a'], v['b'], v['xxxx'], v['point'], v['ddddd'] from ${tableName} where (cast(v['point'] as int) = 1);""" + + sql "truncate table ${tableName}" + sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')""" + sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')""" + sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')""" + sql """insert into ${tableName} values (1, '{"1" : 1, "2" : 2, "3" : 3, "4": "4", "a" : 1, "aa":2, "aaa" : 3,"aaaaaa":1234, ".a." : 1}')""" + qt_sql """select v['aaaa'] from ${tableName}""" + qt_sql """select v['aaa'] from ${tableName}""" + qt_sql """select v['aa'] from ${tableName}""" + qt_sql """select v['1'] from ${tableName}""" } finally { // try_sql("DROP TABLE IF EXISTS ${tableName}") GetDebugPoint().disableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats")