Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into refresh_database_…
Browse files Browse the repository at this point in the history
…nereids
  • Loading branch information
vinlee19 committed Nov 29, 2024
2 parents 0c252c0 + bc67fc9 commit 85d6c87
Show file tree
Hide file tree
Showing 421 changed files with 10,045 additions and 3,347 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand Down
23 changes: 19 additions & 4 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "util/cgroup_util.h"
#include "util/error_util.h"

namespace doris {

Expand Down Expand Up @@ -84,14 +85,23 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
: _mount_file_dir(std::move(mount_file_dir)) {}

Status read_memory_limit(int64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
value));
std::filesystem::path file_path = _mount_file_dir / "memory.max";
std::string line;
std::ifstream file_stream(file_path, std::ios::in);
getline(file_stream, line);
if (file_stream.fail() || file_stream.bad()) {
return Status::CgroupError("Error reading {}: {}", file_path.string(),
get_str_err_msg());
}
if (line == "max") {
*value = std::numeric_limits<int64_t>::max();
return Status::OK();
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(file_path, value));
return Status::OK();
}

Status read_memory_usage(int64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), value));
std::unordered_map<std::string, int64_t> metrics_map;
Expand All @@ -100,7 +110,12 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
if (*value < metrics_map["inactive_file"]) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
// the reason why we subtract inactive_file described here:
// https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
*value -= metrics_map["inactive_file"];
// Part of "slab" that might be reclaimed, such as dentries and inodes.
// https://arthurchiao.art/blog/cgroupv2-zh/
*value -= metrics_map["slab_reclaimable"];
return Status::OK();
}

Expand Down
14 changes: 7 additions & 7 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,18 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
return Status::InternalError("bloomfilter_func is nullptr");
}
if (bloomfilter_func->_bloom_filter == nullptr) {
return Status::InternalError("bloomfilter_func->_bloom_filter is nullptr");
return Status::InternalError(
"bloomfilter_func->_bloom_filter is nullptr, bloomfilter_func->inited: {}",
bloomfilter_func->_inited);
}
// If `_inited` is false, there is no memory allocated in bloom filter and this is the first
// call for `merge` function. So we just reuse this bloom filter, and we don't need to
// allocate memory again.
if (!_inited) {
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
if (_bloom_filter != nullptr) {
return Status::InternalError("_bloom_filter must is nullptr");
return Status::InternalError("_bloom_filter must is nullptr, inited: {}", _inited);
}
_bloom_filter = bloomfilter_func->_bloom_filter;
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_inited = true;
light_copy(bloomfilter_func);
return Status::OK();
}
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
Expand Down Expand Up @@ -207,7 +206,8 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {

bool contain_null() const {
if (!_bloom_filter) {
throw Exception(ErrorCode::INTERNAL_ERROR, "_bloom_filter is nullptr");
throw Exception(ErrorCode::INTERNAL_ERROR, "_bloom_filter is nullptr, inited: {}",
_inited);
}
return _bloom_filter->contain_null();
}
Expand Down
8 changes: 3 additions & 5 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,9 +1114,6 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
std::lock_guard l(*local_merge_filters->lock);
local_merge_filters->merge_size_times--;
local_merge_filters->local_merged_size += local_filter_size;
if (_has_local_target) {
set_synced_size(local_filter_size);
}
if (local_merge_filters->merge_size_times) {
return Status::OK();
} else {
Expand Down Expand Up @@ -1539,9 +1536,10 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, error_msg: {}",
"build_bf_cardinality: {}, ignored: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
_wrapper->get_build_bf_cardinality(), _wrapper->is_ignored(),
_wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down
26 changes: 21 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,28 @@ Status Compaction::do_inverted_index_compaction() {
try {
std::vector<std::unique_ptr<DorisCompoundReader>> src_idx_dirs(src_segment_num);
for (int src_segment_id = 0; src_segment_id < src_segment_num; src_segment_id++) {
src_idx_dirs[src_segment_id] =
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
auto res = inverted_index_file_readers[src_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_reader", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_index_file_reader error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
src_idx_dirs[src_segment_id] = std::move(res.value());
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
auto res = inverted_index_file_writers[dest_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_writer", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_inverted_index_file_writer error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
// Destination directories in dest_index_dirs do not need to be deconstructed,
// but their lifecycle must be managed by inverted_index_file_writers.
dest_index_dirs[dest_segment_id] = dest_dir.get();
dest_index_dirs[dest_segment_id] = res.value().get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs,
index_tmp_path.native(), trans_vec, dest_segment_num_rows);
Expand All @@ -681,6 +694,9 @@ Status Compaction::do_inverted_index_compaction() {
} catch (CLuceneError& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
} catch (const Exception& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
<< ", delete_sign_idx=" << delete_sign_idx;
// for duplicate no keys
if (!key_columns.empty()) {
column_groups->emplace_back(std::move(key_columns));
column_groups->emplace_back(key_columns);
}

std::vector<uint32_t> value_columns;
Expand Down
Loading

0 comments on commit 85d6c87

Please sign in to comment.