From 6969ad0596b1e1b16f0abaa722545a5220860fc1 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 2 Jul 2024 21:43:15 +0800 Subject: [PATCH] [enhancement](compaction) optimizing memory usage for compaction (#37099) --- be/src/cloud/cloud_base_compaction.cpp | 10 ++ be/src/cloud/cloud_cumulative_compaction.cpp | 13 +- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/olap/base_compaction.cpp | 10 ++ be/src/olap/base_tablet.h | 5 + be/src/olap/compaction.cpp | 15 +- be/src/olap/compaction.h | 2 + be/src/olap/cumulative_compaction.cpp | 15 +- be/src/olap/iterators.h | 15 +- be/src/olap/merger.cpp | 67 ++++++++- be/src/olap/merger.h | 6 +- be/src/olap/rowset/rowset_meta.h | 15 ++ be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/tablet_reader.h | 2 + be/src/vec/olap/vertical_block_reader.cpp | 24 ++- be/src/vec/olap/vertical_block_reader.h | 3 +- be/src/vec/olap/vertical_merge_iterator.cpp | 29 ++-- be/src/vec/olap/vertical_merge_iterator.h | 25 +++- be/test/olap/base_compaction_test.cpp | 84 +++++++++++ be/test/olap/rowid_conversion_test.cpp | 6 +- be/test/vec/olap/vertical_compaction_test.cpp | 14 +- .../compaction_width_array_column.groovy | 137 ++++++++++++++++++ 23 files changed, 469 insertions(+), 42 deletions(-) create mode 100644 be/test/olap/base_compaction_test.cpp create mode 100644 regression-test/suites/compaction/compaction_width_array_column.groovy diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index d4a86743a488c7..4ceab8eb6e39b5 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() { return Status::Error("no suitable versions for compaction"); } + int score = 0; + int rowset_cnt = 0; + while (rowset_cnt < _input_rowsets.size()) { + score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); + if (score > config::base_compaction_max_compaction_score) { + break; + } + } + _input_rowsets.resize(rowset_cnt); + // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index de318f979a5909..2a26b1b294b58e 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -354,11 +354,20 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { return st; } + int64_t max_score = config::cumulative_compaction_max_deltas; + auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); + bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; + if (cloud_tablet()->last_compaction_status.is() || + memory_usage_high) { + max_score = std::max(config::cumulative_compaction_max_deltas / + config::cumulative_compaction_max_deltas_factor, + config::cumulative_compaction_min_deltas + 1); + } + size_t compaction_score = 0; auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy(); _engine.cumu_compaction_policy(compaction_policy) - ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, - config::cumulative_compaction_max_deltas, + ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, &compaction_score); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 92303473ad6f92..799427e2226c27 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -386,6 +386,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1"); DEFINE_Bool(enable_base_compaction_idle_sched, "true"); DEFINE_mInt64(base_compaction_min_rowset_num, "5"); +DEFINE_mInt64(base_compaction_max_compaction_score, "20"); DEFINE_mDouble(base_compaction_min_data_ratio, "0.3"); DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024"); @@ -416,6 +417,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64"); // cumulative compaction policy: min and max delta file's number DEFINE_mInt64(cumulative_compaction_min_deltas, "5"); DEFINE_mInt64(cumulative_compaction_max_deltas, "1000"); +DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10"); // This config can be set to limit thread number in multiget thread pool. DEFINE_mInt32(multi_get_max_threads, "10"); @@ -1317,6 +1319,10 @@ DEFINE_Bool(enable_file_logger, "true"); // The minimum row group size when exporting Parquet files. default 128MB DEFINE_Int64(min_row_group_size, "134217728"); +DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824"); + +DEFINE_mInt64(compaction_batch_size, "-1"); + // If set to false, the parquet reader will not use page index to filter data. // This is only for debug purpose, in case sometimes the page index // filter wrong data. diff --git a/be/src/common/config.h b/be/src/common/config.h index 1a9e3291db5adb..94c2ec5b0a7da8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -438,6 +438,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads); DECLARE_Bool(enable_base_compaction_idle_sched); DECLARE_mInt64(base_compaction_min_rowset_num); +DECLARE_mInt64(base_compaction_max_compaction_score); DECLARE_mDouble(base_compaction_min_data_ratio); DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes); @@ -468,6 +469,7 @@ DECLARE_mInt64(compaction_min_size_mbytes); // cumulative compaction policy: min and max delta file's number DECLARE_mInt64(cumulative_compaction_min_deltas); DECLARE_mInt64(cumulative_compaction_max_deltas); +DECLARE_mInt32(cumulative_compaction_max_deltas_factor); // This config can be set to limit thread number in multiget thread pool. DECLARE_mInt32(multi_get_max_threads); @@ -1403,6 +1405,10 @@ DECLARE_Bool(enable_file_logger); // The minimum row group size when exporting Parquet files. DECLARE_Int64(min_row_group_size); +DECLARE_mInt64(compaction_memory_bytes_limit); + +DECLARE_mInt64(compaction_batch_size); + DECLARE_mBool(enable_parquet_page_index); #ifdef BE_TEST diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 436180c78ca87d..8be29383c1e9b1 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -151,6 +151,16 @@ Status BaseCompaction::pick_rowsets_to_compact() { "situation, no need to do base compaction."); } + int score = 0; + int rowset_cnt = 0; + while (rowset_cnt < _input_rowsets.size()) { + score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); + if (score > config::base_compaction_max_compaction_score) { + break; + } + } + _input_rowsets.resize(rowset_cnt); + // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index dc5f488e04492c..4852a6cba9b7fd 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -22,6 +22,7 @@ #include #include "common/status.h" +#include "olap/iterators.h" #include "olap/partial_update_info.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" @@ -299,6 +300,10 @@ class BaseTablet { std::atomic read_block_count = 0; std::atomic write_count = 0; std::atomic compaction_count = 0; + + std::mutex sample_info_lock; + std::vector sample_infos; + Status last_compaction_status = Status::OK(); }; } /* namespace doris */ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 37dcac5283ee98..b42c23f18742bc 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -149,6 +149,15 @@ void Compaction::init_profile(const std::string& label) { _merge_rowsets_latency_timer = ADD_TIMER(_profile, "merge_rowsets_latency"); } +int64_t Compaction::merge_way_num() { + int64_t way_num = 0; + for (auto&& rowset : _input_rowsets) { + way_num += rowset->rowset_meta()->get_merge_way_num(); + } + + return way_num; +} + Status Compaction::merge_input_rowsets() { std::vector input_rs_readers; input_rs_readers.reserve(_input_rowsets.size()); @@ -170,19 +179,23 @@ Status Compaction::merge_input_rowsets() { _stats.rowid_conversion = &_rowid_conversion; } + int64_t way_num = merge_way_num(); + Status res; { SCOPED_TIMER(_merge_rowsets_latency_timer); if (_is_vertical) { res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &_stats); + get_avg_segment_rows(), way_num, &_stats); } else { res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), &_stats); } } + _tablet->last_compaction_status = res; + if (!res.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 9ec1297c69cb0a..8e0c1099a20942 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -81,6 +81,8 @@ class Compaction { void _load_segment_to_cache(); + int64_t merge_way_num(); + // the root tracker for this compaction std::shared_ptr _mem_tracker; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 1e0f338da23978..2c7e654787a650 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -134,11 +134,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", tablet=" << _tablet->tablet_id(); } + int64_t max_score = config::cumulative_compaction_max_deltas; + auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); + bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; + if (tablet()->last_compaction_status.is() || memory_usage_high) { + max_score = std::max(config::cumulative_compaction_max_deltas / + config::cumulative_compaction_max_deltas_factor, + config::cumulative_compaction_min_deltas + 1); + } + size_t compaction_score = 0; tablet()->cumulative_compaction_policy()->pick_input_rowsets( - tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas, - config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, - &compaction_score, _allow_delete_in_cumu_compaction); + tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, + &_input_rowsets, &_last_delete_version, &compaction_score, + _allow_delete_in_cumu_compaction); // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::Error(): diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 330aa9e3475806..cbf8f1eca65ae2 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "common/status.h" @@ -121,6 +122,12 @@ class StorageReadOptions { size_t topn_limit = 0; }; +struct CompactionSampleInfo { + int64_t bytes = 0; + int64_t rows = 0; + int64_t group_data_size; +}; + class RowwiseIterator; using RowwiseIteratorUPtr = std::unique_ptr; class RowwiseIterator { @@ -133,7 +140,13 @@ class RowwiseIterator { // Input options may contain scan range in which this scan. // Return Status::OK() if init successfully, // Return other error otherwise - virtual Status init(const StorageReadOptions& opts) = 0; + virtual Status init(const StorageReadOptions& opts) { + return Status::NotSupported("to be implemented"); + } + + virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) { + return Status::NotSupported("to be implemented"); + } // If there is any valid data, this function will load data // into input batch with Status::OK() returned diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index cecbeb163dd673..4c620d30252950 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -33,7 +34,9 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" #include "olap/base_tablet.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowid_conversion.h" @@ -43,6 +46,7 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_reader.h" #include "olap/utils.h" #include "util/slice.h" @@ -241,7 +245,8 @@ Status Merger::vertical_compact_one_group( vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes) { + std::vector key_group_cluster_key_idxes, int64_t batch_size, + CompactionSampleInfo* sample_info) { // build tablet reader VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; vectorized::VerticalBlockReader reader(row_source_buf); @@ -279,7 +284,8 @@ Status Merger::vertical_compact_one_group( reader_params.return_columns = column_group; reader_params.origin_return_columns = &reader_params.return_columns; - RETURN_IF_ERROR(reader.init(reader_params)); + reader_params.batch_size = batch_size; + RETURN_IF_ERROR(reader.init(reader_params, sample_info)); if (reader_params.record_rowids) { stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); @@ -385,6 +391,55 @@ Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_t return Status::OK(); } +int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) { + std::unique_lock lock(tablet->sample_info_lock); + CompactionSampleInfo info = tablet->sample_infos[group_index]; + if (way_cnt <= 0) { + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " way cnt: " << way_cnt; + return 4096 - 32; + } + int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt; + if (tablet->last_compaction_status.is()) { + block_mem_limit /= 4; + } + + int64_t group_data_size = 0; + if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) { + float smoothing_factor = 0.5; + group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) + + info.bytes / info.rows * smoothing_factor); + tablet->sample_infos[group_index].group_data_size = group_data_size; + } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) { + group_data_size = info.group_data_size; + } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) { + group_data_size = info.bytes / info.rows; + tablet->sample_infos[group_index].group_data_size = group_data_size; + } else { + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " group data size: " << info.group_data_size + << " row num: " << info.rows << " consume bytes: " << info.bytes; + return 1024 - 32; + } + + if (group_data_size <= 0) { + LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " unexpected group data size: " << group_data_size; + return 4096 - 32; + } + + tablet->sample_infos[group_index].bytes = 0; + tablet->sample_infos[group_index].rows = 0; + + int64_t batch_size = block_mem_limit / group_data_size; + int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L); + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() + << " group data size: " << info.group_data_size << " row num: " << info.rows + << " consume bytes: " << info.bytes << " way cnt: " << way_cnt + << " batch size: " << res; + return res; +} + // steps to do vertical merge: // 1. split columns into column groups // 2. compact groups one by one, generate a row_source_buf when compact key group @@ -394,7 +449,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t const TabletSchema& tablet_schema, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, - Statistics* stats_output) { + int64_t merge_way_num, Statistics* stats_output) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector> column_groups; vertical_split_columns(tablet_schema, &column_groups); @@ -405,14 +460,18 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t vectorized::RowSourcesBuffer row_sources_buf( tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type); + tablet->sample_infos.resize(column_groups.size(), {0, 0, 0}); // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); + int64_t batch_size = config::compaction_batch_size != -1 + ? config::compaction_batch_size + : estimate_batch_size(i, tablet, merge_way_num); RETURN_IF_ERROR(vertical_compact_one_group( tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output, - key_group_cluster_key_idxes)); + key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i]))); if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 5749f518136bd9..7513c90fbd1217 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "io/io_common.h" +#include "olap/iterators.h" #include "olap/rowset/rowset_fwd.h" #include "olap/tablet_fwd.h" @@ -59,7 +60,7 @@ class Merger { static Status vertical_merge_rowsets( BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, const std::vector& src_rowset_readers, - RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, + RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num, Statistics* stats_output); // for vertical compaction @@ -71,7 +72,8 @@ class Merger { vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes); + std::vector key_group_cluster_key_idxes, int64_t batch_size, + CompactionSampleInfo* sample_info); // for segcompaction static Status vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type, diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 90b2ce48a0a5f0..aa20b5b1ef13ac 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -269,6 +269,21 @@ class RowsetMeta { return score; } + uint32_t get_merge_way_num() const { + uint32_t way_num = 0; + if (!is_segments_overlapping()) { + if (num_segments() == 0) { + way_num = 0; + } else { + way_num = 1; + } + } else { + way_num = num_segments(); + CHECK(way_num > 0); + } + return way_num; + } + void get_segments_key_bounds(std::vector* segments_key_bounds) const { for (const KeyBoundsPB& key_range : _rowset_meta_pb.segments_key_bounds()) { segments_key_bounds->push_back(key_range); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 22a7049aa8f3b4..95f2a945134b4c 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -101,7 +101,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.tablet = tablet; reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; - return (*reader)->init(reader_params); + return (*reader)->init(reader_params, nullptr); } std::unique_ptr SegcompactionWorker::_create_segcompaction_writer( diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index a3cd3bd4a49577..c257ba007f531a 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -183,6 +183,8 @@ class TabletReader { void check_validation() const; std::string to_string() const; + + int64_t batch_size = -1; }; TabletReader() = default; diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index c4dda20f40f9a2..61d24e79f77004 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -25,6 +25,8 @@ #include #include "cloud/config.h" +#include "olap/compaction.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -108,7 +110,8 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return Status::OK(); } -Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) { +Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, + CompactionSampleInfo* sample_info) { std::vector iterator_init_flag; std::vector rowset_ids; std::vector* segment_iters_ptr = read_params.segment_iters_ptr; @@ -157,7 +160,10 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) // init collect iterator StorageReadOptions opts; opts.record_rowids = read_params.record_rowids; - RETURN_IF_ERROR(_vcollect_iter->init(opts)); + if (read_params.batch_size > 0) { + opts.block_row_max = read_params.batch_size; + } + RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { @@ -204,13 +210,21 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { } Status VerticalBlockReader::init(const ReaderParams& read_params) { + return init(read_params, nullptr); +} + +Status VerticalBlockReader::init(const ReaderParams& read_params, + CompactionSampleInfo* sample_info) { StorageReadOptions opts; - _reader_context.batch_size = opts.block_row_max; + if (read_params.batch_size > 0) { + _reader_context.batch_size = read_params.batch_size; + } else { + _reader_context.batch_size = opts.block_row_max; + } RETURN_IF_ERROR(TabletReader::init(read_params)); _arena = std::make_unique(); - - auto status = _init_collect_iter(read_params); + auto status = _init_collect_iter(read_params, sample_info); if (!status.ok()) [[unlikely]] { if (!config::is_cloud_mode()) { static_cast(_tablet.get())->report_error(status); diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index 81ef8d7910077d..e1e8cfa1239b72 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -56,6 +56,7 @@ class VerticalBlockReader final : public TabletReader { // Initialize VerticalBlockReader with tablet, data version and fetch range. Status init(const ReaderParams& read_params) override; + Status init(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status next_block_with_aggregation(Block* block, bool* eof) override; @@ -79,7 +80,7 @@ class VerticalBlockReader final : public TabletReader { // to minimize the comparison time in merge heap. Status _unique_key_next_block(Block* block, bool* eof); - Status _init_collect_iter(const ReaderParams& read_params); + Status _init_collect_iter(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status _get_segment_iterators(const ReaderParams& read_params, std::vector* segment_iters, diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 3323492ee9015c..81cfc756d63562 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include "cloud/config.h" @@ -29,6 +30,7 @@ #include "common/logging.h" #include "io/cache/block_file_cache_factory.h" #include "olap/field.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "vec/columns/column.h" #include "vec/common/string_ref.h" @@ -340,13 +342,18 @@ Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { return Status::OK(); } -Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) { +Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { if (LIKELY(_inited)) { return Status::OK(); } _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_load_next_block()); + if (sample_info != nullptr) { + sample_info->bytes += bytes(); + sample_info->rows += rows(); + } if (valid()) { RETURN_IF_ERROR(advance()); } @@ -505,7 +512,8 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); _record_rowids = opts.record_rowids; if (_origin_iters.empty()) { @@ -533,7 +541,7 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { for (size_t i = 0; i < num_iters; ++i) { if (_iterator_init_flags[i] || pre_iter_invalid) { auto& ctx = _ori_iter_ctx[i]; - RETURN_IF_ERROR(ctx->init(opts)); + RETURN_IF_ERROR(ctx->init(opts, sample_info)); if (!ctx->valid()) { pre_iter_invalid = true; continue; @@ -606,7 +614,8 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); DCHECK(_keys_type == KeysType::DUP_KEYS); _record_rowids = opts.record_rowids; @@ -626,7 +635,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { std::unique_ptr ctx( new VerticalMergeIteratorContext(std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx)); - RETURN_IF_ERROR(ctx->init(opts)); + RETURN_IF_ERROR(ctx->init(opts, sample_info)); if (!ctx->valid()) { ++seg_order; continue; @@ -667,7 +676,7 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; // init ctx and this ctx must be valid - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (UNLIKELY(ctx->is_first_row())) { @@ -701,7 +710,7 @@ Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef auto row_source = _row_sources_buf->current(); uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -740,7 +749,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { uint16_t order = _row_sources_buf->current().get_source_num(); DCHECK(order < _origin_iter_ctx.size()); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -763,7 +772,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { return st; } -Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { if (_origin_iters.empty()) { return Status::OK(); } @@ -778,6 +788,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { } _origin_iters.clear(); + _sample_info = sample_info; _block_row_max = opts.block_row_max; return Status::OK(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index f46a0446cf25a0..3751aa92c78b15 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -164,7 +164,7 @@ class VerticalMergeIteratorContext { ~VerticalMergeIteratorContext() = default; Status block_reset(const std::shared_ptr& block); - Status init(const StorageReadOptions& opts); + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info = nullptr); bool compare(const VerticalMergeIteratorContext& rhs) const; Status copy_rows(Block* block, bool advanced = true); Status copy_rows(Block* block, size_t count); @@ -200,6 +200,22 @@ class VerticalMergeIteratorContext { return _block_row_locations[_index_in_block]; } + size_t bytes() { + if (_block) { + return _block->bytes(); + } else { + return 0; + } + } + + size_t rows() { + if (_block) { + return _block->rows(); + } else { + return 0; + } + } + private: // Load next block into _block Status _load_next_block(); @@ -255,7 +271,7 @@ class VerticalHeapMergeIterator : public RowwiseIterator { VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete; VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -321,7 +337,7 @@ class VerticalFifoMergeIterator : public RowwiseIterator { VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete; VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -367,7 +383,7 @@ class VerticalMaskMergeIterator : public RowwiseIterator { VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete; VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; @@ -396,6 +412,7 @@ class VerticalMaskMergeIterator : public RowwiseIterator { size_t _filtered_rows = 0; RowSourcesBuffer* _row_sources_buf; StorageReadOptions _opts; + CompactionSampleInfo* _sample_info = nullptr; }; // segment merge iterator diff --git a/be/test/olap/base_compaction_test.cpp b/be/test/olap/base_compaction_test.cpp new file mode 100644 index 00000000000000..7d9abe54ed2163 --- /dev/null +++ b/be/test/olap/base_compaction_test.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/base_compaction.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "util/uid_util.h" + +namespace doris { + +class TestBaseCompaction : public testing::Test {}; + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +TEST_F(TestBaseCompaction, filter_input_rowset) { + StorageEngine engine({}); + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + tablet->_cumulative_point = 25; + BaseCompaction compaction(engine, tablet); + //std::vector rowsets; + + RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0); + tablet->_rs_version_map.emplace(init_rs->version(), init_rs); + for (int i = 2; i < 30; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + tablet->_rs_version_map.emplace(rs->version(), rs); + } + Status st = compaction.pick_rowsets_to_compact(); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0); + EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1); + + EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21); + EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21); +} + +} // namespace doris diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 7c56710f2e8c81..5ae80398afb1b9 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -348,9 +348,9 @@ class TestRowIdConversion : public testing::TestWithParambuild(out_rowset)); @@ -598,7 +598,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, &stats); + output_rs_writer.get(), 100, num_segments, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -706,7 +706,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 10000, &stats); + output_rs_writer.get(), 10000, num_segments, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -815,7 +815,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -917,7 +918,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1010,7 +1012,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, &stats); + output_rs_writer.get(), 100, num_segments, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); diff --git a/regression-test/suites/compaction/compaction_width_array_column.groovy b/regression-test/suites/compaction/compaction_width_array_column.groovy new file mode 100644 index 00000000000000..4e3fed354c7d84 --- /dev/null +++ b/regression-test/suites/compaction/compaction_width_array_column.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('compaction_width_array_column', "p2") { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + def s3BucketName = getS3BucketName() + def random = new Random(); + + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "column_witdh_array" + + def table_create_task = { table_name -> + // drop table if exists + sql """drop table if exists ${table_name}""" + // create table + def create_table = new File("""${context.file.parent}/ddl/${table_name}.sql""").text + create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", table_name) + sql create_table + } + + def table_load_task = { table_name -> + uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + loadLabel = table_name + "_" + uniqueID + //loadLabel = table_name + '_load_5' + loadSql = new File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name) + nowloadSql = loadSql + s3WithProperties + try_sql nowloadSql + + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + logger.info("load result is ${stateResult}") + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + table_create_task(tableName) + table_load_task(tableName) + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + boolean isOverLap = true + int tryCnt = 0; + while (isOverLap && tryCnt < 3) { + isOverLap = false + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + logger.info("rowset info" + rowset) + String overLappingStr = rowset.split(" ")[3] + if (overLappingStr == "OVERLAPPING") { + isOverLap = true; + } + logger.info("is over lap " + isOverLap + " " + overLappingStr) + } + } + tryCnt++; + } + + assertFalse(isOverLap); +}