Skip to content

Commit

Permalink
[enhancement](compaction) optimizing memory usage for compaction (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
luwei16 authored Jul 2, 2024
1 parent 277de79 commit 6969ad0
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 42 deletions.
10 changes: 10 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
13 changes: 11 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,20 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
return st;
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

Expand Down Expand Up @@ -416,6 +417,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");

// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
Expand Down Expand Up @@ -1317,6 +1319,10 @@ DEFINE_Bool(enable_file_logger, "true");
// The minimum row group size when exporting Parquet files. default 128MB
DEFINE_Int64(min_row_group_size, "134217728");

DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");

DEFINE_mInt64(compaction_batch_size, "-1");

// If set to false, the parquet reader will not use page index to filter data.
// This is only for debug purpose, in case sometimes the page index
// filter wrong data.
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);

DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

Expand Down Expand Up @@ -468,6 +469,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
DECLARE_mInt32(cumulative_compaction_max_deltas_factor);

// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
Expand Down Expand Up @@ -1403,6 +1405,10 @@ DECLARE_Bool(enable_file_logger);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);

DECLARE_mInt64(compaction_memory_bytes_limit);

DECLARE_mInt64(compaction_batch_size);

DECLARE_mBool(enable_parquet_page_index);

#ifdef BE_TEST
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
"situation, no need to do base compaction.");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "common/status.h"
#include "olap/iterators.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
Expand Down Expand Up @@ -299,6 +300,10 @@ class BaseTablet {
std::atomic<int64_t> read_block_count = 0;
std::atomic<int64_t> write_count = 0;
std::atomic<int64_t> compaction_count = 0;

std::mutex sample_info_lock;
std::vector<CompactionSampleInfo> sample_infos;
Status last_compaction_status = Status::OK();
};

} /* namespace doris */
15 changes: 14 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ void Compaction::init_profile(const std::string& label) {
_merge_rowsets_latency_timer = ADD_TIMER(_profile, "merge_rowsets_latency");
}

int64_t Compaction::merge_way_num() {
int64_t way_num = 0;
for (auto&& rowset : _input_rowsets) {
way_num += rowset->rowset_meta()->get_merge_way_num();
}

return way_num;
}

Status Compaction::merge_input_rowsets() {
std::vector<RowsetReaderSharedPtr> input_rs_readers;
input_rs_readers.reserve(_input_rowsets.size());
Expand All @@ -170,19 +179,23 @@ Status Compaction::merge_input_rowsets() {
_stats.rowid_conversion = &_rowid_conversion;
}

int64_t way_num = merge_way_num();

Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &_stats);
get_avg_segment_rows(), way_num, &_stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class Compaction {

void _load_segment_to_cache();

int64_t merge_way_num();

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", tablet=" << _tablet->tablet_id();
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
tablet()->cumulative_compaction_policy()->pick_input_rowsets(
tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score, _allow_delete_in_cumu_compaction);
tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
&_input_rowsets, &_last_delete_version, &compaction_score,
_allow_delete_in_cumu_compaction);

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <cstddef>
#include <memory>

#include "common/status.h"
Expand Down Expand Up @@ -121,6 +122,12 @@ class StorageReadOptions {
size_t topn_limit = 0;
};

struct CompactionSampleInfo {
int64_t bytes = 0;
int64_t rows = 0;
int64_t group_data_size;
};

class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
Expand All @@ -133,7 +140,13 @@ class RowwiseIterator {
// Input options may contain scan range in which this scan.
// Return Status::OK() if init successfully,
// Return other error otherwise
virtual Status init(const StorageReadOptions& opts) = 0;
virtual Status init(const StorageReadOptions& opts) {
return Status::NotSupported("to be implemented");
}

virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) {
return Status::NotSupported("to be implemented");
}

// If there is any valid data, this function will load data
// into input batch with Status::OK() returned
Expand Down
67 changes: 63 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <algorithm>
#include <iterator>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <shared_mutex>
Expand All @@ -33,7 +34,9 @@

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/base_tablet.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
Expand All @@ -43,6 +46,7 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_reader.h"
#include "olap/utils.h"
#include "util/slice.h"
Expand Down Expand Up @@ -241,7 +245,8 @@ Status Merger::vertical_compact_one_group(
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes) {
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
Expand Down Expand Up @@ -279,7 +284,8 @@ Status Merger::vertical_compact_one_group(

reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));
reader_params.batch_size = batch_size;
RETURN_IF_ERROR(reader.init(reader_params, sample_info));

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
Expand Down Expand Up @@ -385,6 +391,55 @@ Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_t
return Status::OK();
}

int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) {
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
CompactionSampleInfo info = tablet->sample_infos[group_index];
if (way_cnt <= 0) {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " way cnt: " << way_cnt;
return 4096 - 32;
}
int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
block_mem_limit /= 4;
}

int64_t group_data_size = 0;
if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
float smoothing_factor = 0.5;
group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) +
info.bytes / info.rows * smoothing_factor);
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) {
group_data_size = info.group_data_size;
} else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
group_data_size = info.bytes / info.rows;
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " << info.group_data_size
<< " row num: " << info.rows << " consume bytes: " << info.bytes;
return 1024 - 32;
}

if (group_data_size <= 0) {
LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " unexpected group data size: " << group_data_size;
return 4096 - 32;
}

tablet->sample_infos[group_index].bytes = 0;
tablet->sample_infos[group_index].rows = 0;

int64_t batch_size = block_mem_limit / group_data_size;
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id()
<< " group data size: " << info.group_data_size << " row num: " << info.rows
<< " consume bytes: " << info.bytes << " way cnt: " << way_cnt
<< " batch size: " << res;
return res;
}

// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key group
Expand All @@ -394,7 +449,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
const TabletSchema& tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output) {
int64_t merge_way_num, Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);
Expand All @@ -405,14 +460,18 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t

vectorized::RowSourcesBuffer row_sources_buf(
tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type);
tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes));
key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i])));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "common/status.h"
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"

Expand Down Expand Up @@ -59,7 +60,7 @@ class Merger {
static Status vertical_merge_rowsets(
BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num,
Statistics* stats_output);

// for vertical compaction
Expand All @@ -71,7 +72,8 @@ class Merger {
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes);
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info);

// for segcompaction
static Status vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type,
Expand Down
Loading

0 comments on commit 6969ad0

Please sign in to comment.