Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into pick-var-rf
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Jul 10, 2024
2 parents 8dc4cd8 + f65d1c4 commit e787a3d
Show file tree
Hide file tree
Showing 231 changed files with 5,393 additions and 3,432 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ docker/thirdparties/docker-compose/hive/scripts/paimon1
fe_plugins/output
fe_plugins/**/.factorypath

docker/thirdparties/docker-compose/hive/scripts/data/*/*/data

fs_brokers/apache_hdfs_broker/src/main/resources/
fs_brokers/apache_hdfs_broker/src/main/thrift/

Expand Down
7 changes: 1 addition & 6 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ header:
- "docs/package-lock.json"
- "regression-test/script/README"
- "regression-test/suites/load_p0/stream_load/data"
- "docker/thirdparties/docker-compose/hive/scripts/README"
- "docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_orc.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/hive/scripts/data/**"
- "docker/thirdparties/docker-compose/hive/scripts/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
Expand Down
1 change: 0 additions & 1 deletion be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ add_thirdparty(crypto)
add_thirdparty(openssl LIBNAME "lib/libssl.a")
add_thirdparty(leveldb)
add_thirdparty(jemalloc LIBNAME "lib/libjemalloc_doris.a")
add_thirdparty(jemalloc_arrow LIBNAME "lib/libjemalloc_arrow.a")

if (WITH_MYSQL)
add_thirdparty(mysql LIBNAME "lib/libmysqlclient.a")
Expand Down
6 changes: 4 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ DEFINE_Validator(compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(compaction_task_num_per_fast_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(low_priority_compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand All @@ -448,8 +450,8 @@ DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400");

// Compaction priority schedule
DEFINE_mBool(enable_compaction_priority_scheduling, "true");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "1");
DEFINE_mDouble(low_priority_tablet_version_num_ratio, "0.7");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "2");
DEFINE_mInt32(low_priority_compaction_score_threshold, "200");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DEFINE_Int32(max_meta_checkpoint_threads, "-1");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ DECLARE_mInt64(pick_rowset_to_compact_interval_sec);
// Compaction priority schedule
DECLARE_mBool(enable_compaction_priority_scheduling);
DECLARE_mInt32(low_priority_compaction_task_num_per_disk);
DECLARE_mDouble(low_priority_tablet_version_num_ratio);
DECLARE_mInt32(low_priority_compaction_score_threshold);

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DECLARE_Int32(max_meta_checkpoint_threads);
Expand Down
14 changes: 6 additions & 8 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,14 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r

Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
vectorized::Block target_block = *input_block;
target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
_is_first_insertion = false;
auto cloneBlock = target_block.clone_without_columns();
auto cloneBlock = input_block->clone_without_columns(&_column_offset);
_input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
_vec_row_comparator->set_block(&_input_mutable_block);
_output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
if (_keys_type != KeysType::DUP_KEYS) {
_init_agg_functions(&target_block);
_init_agg_functions(input_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_is_partial_update) {
Expand All @@ -210,11 +208,11 @@ Status MemTable::insert(const vectorized::Block* input_block,
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
auto block_size0 = _input_mutable_block.allocated_bytes();
RETURN_IF_ERROR(_input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows));
RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
row_idxs.data() + num_rows, &_column_offset));
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
auto input_size = size_t(target_block.bytes() * num_rows / target_block.rows() *
auto input_size = size_t(input_block->bytes() * num_rows / input_block->rows() *
config::memtable_insert_memory_ratio);
_mem_usage += input_size;
_insert_mem_tracker->consume(input_size);
Expand Down Expand Up @@ -348,7 +346,7 @@ Status MemTable::_sort_by_cluster_keys() {
row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
}
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
row_pos_vec.data() + in_block.rows(), &_column_offset);
}

void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
Expand Down
88 changes: 49 additions & 39 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,45 @@ void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest*
response->mutable_status()->set_status_code(0);
}

bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type,
bool all_base) {
if (count >= thread_per_disk) {
// Return if no available slot
return false;
} else if (count >= thread_per_disk - 1) {
// Only one slot left, check if it can be assigned to base compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (all_base) {
return false;
}
}
}
return true;
}

int get_concurrent_per_disk(int max_score, int thread_per_disk) {
if (!config::enable_compaction_priority_scheduling) {
return thread_per_disk;
}

double load_average = 0;
if (DorisMetrics::instance()->system_metrics() != nullptr) {
load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
}
int num_cores = doris::CpuInfo::num_cores();
bool cpu_usage_high = load_average > num_cores * 0.8;

auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;

if (max_score <= config::low_priority_compaction_score_threshold &&
(cpu_usage_high || memory_usage_high)) {
return config::low_priority_compaction_task_num_per_disk;
}

return thread_per_disk;
}

std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
_update_cumulative_compaction_policy();
Expand Down Expand Up @@ -870,22 +909,11 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
int count = copied_cumu_map[data_dir].size() + copied_base_map[data_dir].size();
int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
: config::compaction_task_num_per_disk;
if (count >= thread_per_disk) {
// Return if no available slot
need_pick_tablet = false;
if (!check_score) {
continue;
}
} else if (count >= thread_per_disk - 1) {
// Only one slot left, check if it can be assigned to base compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (copied_cumu_map[data_dir].empty()) {
need_pick_tablet = false;
if (!check_score) {
continue;
}
}
}

need_pick_tablet = need_generate_compaction_tasks(count, thread_per_disk, compaction_type,
copied_cumu_map[data_dir].empty());
if (!need_pick_tablet && !check_score) {
continue;
}

// Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
Expand All @@ -898,6 +926,9 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
? copied_cumu_map[data_dir]
: copied_base_map[data_dir],
&disk_max_score, _cumulative_compaction_policies);
int concurrent_num = get_concurrent_per_disk(disk_max_score, thread_per_disk);
need_pick_tablet = need_generate_compaction_tasks(
count, concurrent_num, compaction_type, copied_cumu_map[data_dir].empty());
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
Expand Down Expand Up @@ -1001,17 +1032,6 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
int64_t permits = 0;
Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
compaction, permits);
bool is_low_priority_task = [&]() {
// Can add more strategies to determine whether a task is a low priority task in the future
if (!config::enable_compaction_priority_scheduling) {
return false;
}
if (tablet->version_count() >=
(config::max_tablet_version_num * config::low_priority_tablet_version_num_ratio)) {
return false;
}
return !force;
}();
if (st.ok() && permits > 0) {
if (!force) {
_permit_limiter.request(permits);
Expand All @@ -1021,18 +1041,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
? _cumu_compaction_thread_pool
: _base_compaction_thread_pool;
auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction),
compaction_type, permits, force, is_low_priority_task,
this]() {
if (is_low_priority_task && !_increase_low_priority_task_nums(tablet->data_dir())) {
VLOG_DEBUG << "skip low priority compaction task for tablet: "
<< tablet->tablet_id();
// Todo: push task back
} else {
tablet->execute_compaction(*compaction);
if (is_low_priority_task) {
_decrease_low_priority_task_nums(tablet->data_dir());
}
}
compaction_type, permits, force, this]() {
tablet->execute_compaction(*compaction);
if (!force) {
_permit_limiter.release(permits);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,12 @@ Status SingleReplicaCompaction::_finish_clone(const string& clone_dir,
}
// clear clone dir
std::filesystem::path clone_dir_path(clone_dir);
std::filesystem::remove_all(clone_dir_path);
std::error_code ec;
std::filesystem::remove_all(clone_dir_path, ec);
if (ec) {
LOG(WARNING) << "failed to remove=" << clone_dir_path << " msg=" << ec.message();
return Status::IOError("failed to remove {}, due to {}", clone_dir, ec.message());
}
LOG(INFO) << "finish to clone data, clear downloaded data. res=" << res
<< ", tablet=" << _tablet->tablet_id() << ", clone_dir=" << clone_dir;
return res;
Expand Down
20 changes: 0 additions & 20 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1465,26 +1465,6 @@ Status StorageEngine::_persist_broken_paths() {
return Status::OK();
}

bool StorageEngine::_increase_low_priority_task_nums(DataDir* dir) {
if (!config::enable_compaction_priority_scheduling) {
return true;
}
std::lock_guard l(_low_priority_task_nums_mutex);
if (_low_priority_task_nums[dir] < config::low_priority_compaction_task_num_per_disk) {
_low_priority_task_nums[dir]++;
return true;
}
return false;
}

void StorageEngine::_decrease_low_priority_task_nums(DataDir* dir) {
if (config::enable_compaction_priority_scheduling) {
std::lock_guard l(_low_priority_task_nums_mutex);
_low_priority_task_nums[dir]--;
DCHECK(_low_priority_task_nums[dir] >= 0);
}
}

int CreateTabletIdxCache::get_index(const std::string& key) {
auto* lru_handle = lookup(key);
if (lru_handle) {
Expand Down
16 changes: 14 additions & 2 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,13 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
/// 2. Call _finish_xx_clone() to revise the tablet meta.
Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_dir, int64_t version,
bool is_incremental_clone) {
Defer remove_clone_dir {[&]() { std::filesystem::remove_all(clone_dir); }};
Defer remove_clone_dir {[&]() {
std::error_code ec;
std::filesystem::remove_all(clone_dir, ec);
if (ec) {
LOG(WARNING) << "failed to remove=" << clone_dir << " msg=" << ec.message();
}
}};

// check clone dir existed
bool exists = true;
Expand Down Expand Up @@ -654,7 +660,13 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
bool contain_binlog = false;
RowsetBinlogMetasPB rowset_binlog_metas_pb;
if (binlog_metas_file_exists) {
auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file);
std::error_code ec;
auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file, ec);
if (ec) {
LOG(WARNING) << "get file size error" << ec.message();
return Status::IOError("can't retrive file_size of {}, due to {}", binlog_metas_file,
ec.message());
}
if (binlog_meta_filesize > 0) {
contain_binlog = true;
RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb));
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/cgroup_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Status CGroupUtil::find_cgroup_mem_info(std::string* file_path) {
}
string cgroup_path;
RETURN_IF_ERROR(find_abs_cgroup_path("memory", &cgroup_path));
*file_path = cgroup_path + "/memory.meminfo";
*file_path = cgroup_path + "/memory.stat";
return Status::OK();
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ void MemInfo::refresh_proc_meminfo() {
if (fields.size() < 2) {
continue;
}
std::string key = fields[0].substr(0, fields[0].size() - 1);
std::string key = fields[0].substr(0, fields[0].size());

StringParser::ParseResult result;
auto mem_value = StringParser::string_to_int<int64_t>(fields[1].data(),
Expand All @@ -449,19 +449,19 @@ void MemInfo::refresh_proc_meminfo() {
// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
// memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)
// so, memory.usage_in_bytes - memory.meminfo["Cached"]
_s_cgroup_mem_usage = cgroup_mem_usage - _s_cgroup_mem_info_bytes["Cached"];
_s_cgroup_mem_usage = cgroup_mem_usage - _s_cgroup_mem_info_bytes["cache"];
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["cache"];
} else {
// find cgroup failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
LOG(INFO)
<< "Refresh cgroup memory failed, refresh again after 300s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["cache"];
}
} else {
if (config::enable_use_cgroup_memory_info) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,14 @@ void SystemMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) {
}
}

double SystemMetrics::get_load_average_1_min() {
if (_load_average_metrics) {
return _load_average_metrics->load_average_1_minutes->value();
} else {
return 0;
}
}

void SystemMetrics::get_network_traffic(std::map<std::string, int64_t>* send_map,
std::map<std::string, int64_t>* rcv_map) {
send_map->clear();
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class SystemMetrics {
const std::map<std::string, int64_t>& lst_rcv_map,
int64_t interval_sec, int64_t* send_rate, int64_t* rcv_rate);

double get_load_average_1_min();

void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& lst_value,
int64_t interval_sec);
void update_max_network_send_bytes_rate(int64_t max_send_bytes_rate);
Expand Down
Loading

0 comments on commit e787a3d

Please sign in to comment.