Skip to content

Commit

Permalink
Merge branch 'master' into show_storage_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
msridhar78 authored Dec 2, 2024
2 parents b1de3a8 + 303e275 commit 0d1d9db
Show file tree
Hide file tree
Showing 1,215 changed files with 21,216 additions and 6,502 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Status CloudBaseCompaction::prepare_compact() {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
Expand Down Expand Up @@ -320,6 +321,10 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
int64_t update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,23 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
int64_t num_output_rows = 0;
int64_t size_output_rowsets = 0;
int64_t num_output_segments = 0;
int64_t index_size_output_rowsets = 0;
int64_t segment_size_output_rowsets = 0;
for (auto& rs : _output_rowsets) {
sc_job->add_txn_ids(rs->txn_id());
sc_job->add_output_versions(rs->end_version());
num_output_rows += rs->num_rows();
size_output_rowsets += rs->total_disk_size();
num_output_segments += rs->num_segments();
index_size_output_rowsets += rs->index_disk_size();
segment_size_output_rowsets += rs->data_disk_size();
}
sc_job->set_num_output_rows(num_output_rows);
sc_job->set_size_output_rowsets(size_output_rowsets);
sc_job->set_num_output_segments(num_output_segments);
sc_job->set_num_output_rowsets(_output_rowsets.size());
sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
}
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ int CloudTablet::delete_expired_stale_rowsets() {
}

for (int64_t path_id : path_ids) {
int start_version = -1;
int end_version = -1;
int64_t start_version = -1;
int64_t end_version = -1;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
Expand Down
23 changes: 19 additions & 4 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "util/cgroup_util.h"
#include "util/error_util.h"

namespace doris {

Expand Down Expand Up @@ -84,14 +85,23 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
: _mount_file_dir(std::move(mount_file_dir)) {}

Status read_memory_limit(int64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
value));
std::filesystem::path file_path = _mount_file_dir / "memory.max";
std::string line;
std::ifstream file_stream(file_path, std::ios::in);
getline(file_stream, line);
if (file_stream.fail() || file_stream.bad()) {
return Status::CgroupError("Error reading {}: {}", file_path.string(),
get_str_err_msg());
}
if (line == "max") {
*value = std::numeric_limits<int64_t>::max();
return Status::OK();
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(file_path, value));
return Status::OK();
}

Status read_memory_usage(int64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), value));
std::unordered_map<std::string, int64_t> metrics_map;
Expand All @@ -100,7 +110,12 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
if (*value < metrics_map["inactive_file"]) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
// the reason why we subtract inactive_file described here:
// https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
*value -= metrics_map["inactive_file"];
// Part of "slab" that might be reclaimed, such as dentries and inodes.
// https://arthurchiao.art/blog/cgroupv2-zh/
*value -= metrics_map["slab_reclaimable"];
return Status::OK();
}

Expand Down
16 changes: 14 additions & 2 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.avg_row_length) {
if (tbl_status.__isset.data_length) {
srcs[i] = tbl_status.data_length;
datas[i] = srcs.data() + i;
} else {
Expand All @@ -248,7 +248,19 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
// max_data_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, null_datas)); }
// index_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, null_datas)); }
{
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.index_length) {
srcs[i] = tbl_status.index_length;
datas[i] = srcs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// data_free
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, null_datas)); }
// auto_increment
Expand Down
16 changes: 9 additions & 7 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,18 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
return Status::InternalError("bloomfilter_func is nullptr");
}
if (bloomfilter_func->_bloom_filter == nullptr) {
return Status::InternalError("bloomfilter_func->_bloom_filter is nullptr");
return Status::InternalError(
"bloomfilter_func->_bloom_filter is nullptr, bloomfilter_func->inited: {}",
bloomfilter_func->_inited);
}
// If `_inited` is false, there is no memory allocated in bloom filter and this is the first
// call for `merge` function. So we just reuse this bloom filter, and we don't need to
// allocate memory again.
if (!_inited) {
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
if (_bloom_filter != nullptr) {
return Status::InternalError("_bloom_filter must is nullptr");
return Status::InternalError("_bloom_filter must is nullptr, inited: {}", _inited);
}
_bloom_filter = bloomfilter_func->_bloom_filter;
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_inited = true;
light_copy(bloomfilter_func);
return Status::OK();
}
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
Expand Down Expand Up @@ -207,7 +206,8 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {

bool contain_null() const {
if (!_bloom_filter) {
throw Exception(ErrorCode::INTERNAL_ERROR, "_bloom_filter is nullptr");
throw Exception(ErrorCode::INTERNAL_ERROR, "_bloom_filter is nullptr, inited: {}",
_inited);
}
return _bloom_filter->contain_null();
}
Expand All @@ -233,6 +233,8 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
uint16_t* offsets, int number,
bool is_parse_column) = 0;

bool inited() const { return _inited; }

private:
void _limit_length() {
if (_runtime_bloom_filter_min_size > 0) {
Expand Down
23 changes: 14 additions & 9 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,9 +1114,6 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
std::lock_guard l(*local_merge_filters->lock);
local_merge_filters->merge_size_times--;
local_merge_filters->local_merged_size += local_filter_size;
if (_has_local_target) {
set_synced_size(local_filter_size);
}
if (local_merge_filters->merge_size_times) {
return Status::OK();
} else {
Expand Down Expand Up @@ -1286,6 +1283,13 @@ PrimitiveType IRuntimeFilter::column_type() const {

void IRuntimeFilter::signal() {
DCHECK(is_consumer());

if (!_wrapper->is_ignored() && _wrapper->is_bloomfilter() &&
!_wrapper->get_bloomfilter()->inited()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored, rf: {}",
debug_string());
}

COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
Expand Down Expand Up @@ -1538,10 +1542,13 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, error_msg: {}",
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, "
"build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, "
"has_remote_target: {},error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
_wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
_dependency ? _dependency->debug_string() : "none", _synced_size, _has_local_target,
_has_remote_target, _wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down Expand Up @@ -1594,9 +1601,7 @@ RuntimeFilterType IRuntimeFilter::get_real_type() {
}

bool IRuntimeFilter::need_sync_filter_size() {
return (type() == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
type() == RuntimeFilterType::BLOOM_FILTER) &&
_wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
return _wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
}

void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrapper,
Expand Down
10 changes: 7 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,13 @@ class IRuntimeFilter {
void set_finish_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency);

int64_t get_synced_size() const { return _synced_size; }

bool isset_synced_size() const { return _synced_size != -1; }
int64_t get_synced_size() const {
if (_synced_size == -1 || !_dependency) {
throw Exception(doris::ErrorCode::INTERNAL_ERROR,
"sync filter size meet error, filter: {}", debug_string());
}
return _synced_size;
}

protected:
// serialize _wrapper to protobuf
Expand Down
9 changes: 2 additions & 7 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ class VRuntimeFilterSlots {
}

// use synced size when this rf has global merged
static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t hash_table_size) {
return runtime_filter->isset_synced_size() ? runtime_filter->get_synced_size()
: hash_table_size;
static uint64_t get_real_size(IRuntimeFilter* filter, uint64_t hash_table_size) {
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
}

Status ignore_filters(RuntimeState* state) {
Expand Down Expand Up @@ -119,10 +118,6 @@ class VRuntimeFilterSlots {
}

if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
if (filter->need_sync_filter_size() != filter->isset_synced_size()) {
return Status::InternalError("sync filter size meet error, filter: {}",
filter->debug_string());
}
RETURN_IF_ERROR(filter->init_bloom_filter(
get_real_size(filter.get(), local_hash_table_size)));
}
Expand Down
31 changes: 16 additions & 15 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1678,13 +1678,16 @@ void BlockFileCache::check_disk_resource_limit() {
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [capacity_percentage, inode_percentage] = percent;
auto inode_is_insufficient = [](const int& inode_percentage) {
return inode_percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
auto [space_percentage, inode_percentage] = percent;
auto is_insufficient = [](const int& percentage) {
return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
};
DCHECK(capacity_percentage >= 0 && capacity_percentage <= 100);
DCHECK(inode_percentage >= 0 && inode_percentage <= 100);
// ATTN: due to that can be change, so if its invalid, set it to default value
DCHECK_GE(space_percentage, 0);
DCHECK_LE(space_percentage, 100);
DCHECK_GE(inode_percentage, 0);
DCHECK_LE(inode_percentage, 100);
// ATTN: due to that can be changed dynamically, set it to default value if it's invalid
// FIXME: reject with config validator
if (config::file_cache_enter_disk_resource_limit_mode_percent <=
config::file_cache_exit_disk_resource_limit_mode_percent) {
LOG_WARNING("config error, set to default value")
Expand All @@ -1693,23 +1696,21 @@ void BlockFileCache::check_disk_resource_limit() {
config::file_cache_enter_disk_resource_limit_mode_percent = 90;
config::file_cache_exit_disk_resource_limit_mode_percent = 80;
}
if (capacity_percentage >= config::file_cache_enter_disk_resource_limit_mode_percent ||
inode_is_insufficient(inode_percentage)) {
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)) {
_disk_resource_limit_mode = true;
_disk_limit_mode_metrics->set_value(1);
} else if (_disk_resource_limit_mode &&
(capacity_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
(space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
(inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
_disk_resource_limit_mode = false;
_disk_limit_mode_metrics->set_value(0);
}
if (_disk_resource_limit_mode) {
// log per mins
LOG_EVERY_N(WARNING, 3) << "file cache background thread space percent="
<< capacity_percentage << " inode percent=" << inode_percentage
<< " is inode insufficient="
<< inode_is_insufficient(inode_percentage)
<< " mode run in resource limit";
LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
<< " inode_percent=" << inode_percentage
<< " is_space_insufficient=" << is_insufficient(space_percentage)
<< " is_inode_insufficient=" << is_insufficient(inode_percentage)
<< " mode run in resource limit";
}
}

Expand Down
Loading

0 comments on commit 0d1d9db

Please sign in to comment.