Skip to content

Commit

Permalink
Merge branch 'branch-3.0' into branch-3.0_20241204_pick
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Dec 5, 2024
2 parents 63088a0 + e8ce643 commit 9ed9349
Show file tree
Hide file tree
Showing 82 changed files with 2,690 additions and 266 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
int64_t update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
_version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ int CloudTablet::delete_expired_stale_rowsets() {
}

for (int64_t path_id : path_ids) {
int start_version = -1;
int end_version = -1;
int64_t start_version = -1;
int64_t end_version = -1;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
Expand Down
23 changes: 19 additions & 4 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "util/cgroup_util.h"
#include "util/error_util.h"

namespace doris {

Expand Down Expand Up @@ -84,14 +85,23 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
: _mount_file_dir(std::move(mount_file_dir)) {}

Status read_memory_limit(int64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
value));
std::filesystem::path file_path = _mount_file_dir / "memory.max";
std::string line;
std::ifstream file_stream(file_path, std::ios::in);
getline(file_stream, line);
if (file_stream.fail() || file_stream.bad()) {
return Status::CgroupError("Error reading {}: {}", file_path.string(),
get_str_err_msg());
}
if (line == "max") {
*value = std::numeric_limits<int64_t>::max();
return Status::OK();
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(file_path, value));
return Status::OK();
}

Status read_memory_usage(int64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), value));
std::unordered_map<std::string, int64_t> metrics_map;
Expand All @@ -100,7 +110,12 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
if (*value < metrics_map["inactive_file"]) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
// the reason why we subtract inactive_file described here:
// https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
*value -= metrics_map["inactive_file"];
// Part of "slab" that might be reclaimed, such as dentries and inodes.
// https://arthurchiao.art/blog/cgroupv2-zh/
*value -= metrics_map["slab_reclaimable"];
return Status::OK();
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
Expand All @@ -144,9 +143,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}
Expand Down
7 changes: 1 addition & 6 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ class SchemaScanner {
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
Status get_next_block_async(RuntimeState* state);

protected:
Expand Down Expand Up @@ -141,7 +137,6 @@ class SchemaScanner {
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
Expand Down
11 changes: 11 additions & 0 deletions be/src/http/http_handler_with_auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ HttpHandlerWithAuth::HttpHandlerWithAuth(ExecEnv* exec_env, TPrivilegeHier::type
: _exec_env(exec_env), _hier(hier), _type(type) {}

int HttpHandlerWithAuth::on_header(HttpRequest* req) {
//if u return value isn't 0,u should `send_reply`,Avoid requesting links that never return.
TCheckAuthRequest auth_request;
TCheckAuthResult auth_result;
AuthInfo auth_info;
Expand Down Expand Up @@ -83,13 +84,22 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {

#ifndef BE_TEST
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
LOG(WARNING) << "Not found master fe, Can't auth API request: " << req->debug_string();
HttpChannel::send_error(req, HttpStatus::SERVICE_UNAVAILABLE);
return -1;
}
{
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&auth_result, &auth_request](FrontendServiceConnection& client) {
client->checkAuth(auth_result, auth_request);
});
if (!status) {
LOG(WARNING) << "CheckAuth Rpc Fail.Fe Ip:" << master_addr.hostname
<< ", Fe port:" << master_addr.port << ".Status:" << status.to_string()
<< ".Request: " << req->debug_string();
HttpChannel::send_error(req, HttpStatus::SERVICE_UNAVAILABLE);
return -1;
}
}
Expand All @@ -98,6 +108,7 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
auth_result.status.status_code = TStatusCode::type::OK;
auth_result.status.error_msgs.clear();
} else {
HttpChannel::send_reply(req, HttpStatus::FORBIDDEN);
return -1;
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
RowsetId rst_id;
rst_id.init(tablet_meta_pb.delete_bitmap().rowset_ids(i));
auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i);
uint32_t ver = tablet_meta_pb.delete_bitmap().versions(i);
auto ver = tablet_meta_pb.delete_bitmap().versions(i);
auto bitmap = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data();
delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap);
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
// new one scanner
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());

_schema_scanner->set_dependency(_data_dependency, _finish_dependency);
_schema_scanner->set_dependency(_data_dependency);
if (nullptr == _schema_scanner) {
return Status::InternalError("schema scanner get nullptr pointer.");
}
Expand Down Expand Up @@ -266,9 +266,6 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/schema_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {

SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
_data_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_DEPENDENCY", true);
}
Expand All @@ -48,7 +45,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {

Status open(RuntimeState* state) override;

Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override { return {_data_dependency.get()}; }

private:
Expand All @@ -57,7 +53,6 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {
SchemaScannerParam _scanner_param;
std::unique_ptr<SchemaScanner> _schema_scanner;

std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _data_dependency;
};

Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
}

// For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
// data is processed and shuffled on the sink.
// Compared to PASSTHROUGH, this is a relatively heavy operation.
static bool heavy_operations_on_the_sink(ExchangeType idx) {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE ||
idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
}

bool need_to_local_exchange(const DataDistribution target_data_distribution,
const int idx) const;
void init_data_distribution() {
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
case ExchangeType::ADAPTIVE_PASSTHROUGH:
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? _runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
Expand Down Expand Up @@ -907,9 +907,13 @@ Status PipelineFragmentContext::_add_local_exchange(
<< " cur_pipe->operators().size(): " << cur_pipe->operators().size()
<< " new_pip->operators().size(): " << new_pip->operators().size();

// Add passthrough local exchanger if necessary
// There are some local shuffles with relatively heavy operations on the sink.
// If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
// Therefore, local passthrough is used to increase the concurrency of the sink.
// op -> local sink(1) -> local source (n)
// op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n)
if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
Pipeline::is_hash_exchange(data_distribution.distribution_type)) {
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operators().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets,
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
_is_high_priority, _self_profile);
}
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
std::lock_guard<std::mutex> l(_tablets_channels_lock);
_tablets_channels.insert({index_id, channel});
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
std::lock_guard<std::mutex> l(_tablets_channels_lock);
_tablets_channels_rows.insert(std::make_pair(
index_id,
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
Expand All @@ -263,7 +263,7 @@ void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
_self_profile->set_timestamp(_last_updated_time);

{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
std::lock_guard<std::mutex> l(_tablets_channels_lock);
for (auto& it : _tablets_channels) {
it.second->refresh_profile();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class LoadChannel {
std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels;
// index id -> (received rows, filtered rows)
std::unordered_map<int64_t, std::pair<size_t, size_t>> _tablets_channels_rows;
SpinLock _tablets_channels_lock;
std::mutex _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
std::unordered_set<int64_t> _finished_channel_ids;
// set to true if at least one tablets channel has been opened
Expand Down
18 changes: 15 additions & 3 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ void MemInfo::refresh_proc_meminfo() {
_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
// find cgroup limit failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
LOG(INFO) << "Refresh cgroup memory limit failed, refresh again after 300s, cgroup "
"mem limit: "
<< _s_cgroup_mem_limit;
LOG(WARNING)
<< "Refresh cgroup memory limit failed, refresh again after 300s, cgroup "
"mem limit: "
<< _s_cgroup_mem_limit << ", " << status;
} else {
_s_cgroup_mem_limit = cgroup_mem_limit;
// wait 10s, 100 * 100ms, avoid too frequently.
Expand All @@ -209,12 +210,17 @@ void MemInfo::refresh_proc_meminfo() {
_s_cgroup_mem_refresh_wait_times++;
}

// cgroup mem limit is refreshed every 10 seconds,
// cgroup mem usage is refreshed together with memInfo every time, which is very frequent.
if (_s_cgroup_mem_limit != std::numeric_limits<int64_t>::max()) {
int64_t cgroup_mem_usage;
auto status = CGroupMemoryCtl::find_cgroup_mem_usage(&cgroup_mem_usage);
if (!status.ok()) {
_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
_s_cgroup_mem_refresh_state = false;
LOG_EVERY_N(WARNING, 500)
<< "Refresh cgroup memory usage failed, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", " << status;
} else {
_s_cgroup_mem_usage = cgroup_mem_usage;
_s_cgroup_mem_refresh_state = true;
Expand Down Expand Up @@ -279,6 +285,12 @@ void MemInfo::refresh_proc_meminfo() {
mem_available = _mem_info_bytes["MemAvailable"];
}
if (_s_cgroup_mem_refresh_state) {
// Note, CgroupV2 MemAvailable is usually a little smaller than Process MemAvailable.
// Process `MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, LowWaterMark))`,
// from `MemAvailable` in `/proc/meminfo`, calculated by OS.
// CgroupV2 `MemAvailable = cgroup_mem_limit - cgroup_mem_usage`,
// `cgroup_mem_usage = memory.current - inactive_file - slab_reclaimable`, in fact,
// there seems to be some memory that can be reused in `cgroup_mem_usage`.
if (mem_available < 0) {
mem_available = _s_cgroup_mem_limit - _s_cgroup_mem_usage;
} else {
Expand Down
10 changes: 8 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,18 @@ AggregateFunctionPtr create_aggregate_function_collect_impl(const std::string& n
if (which.is_date_or_datetime()) {
return do_create_agg_function_collect<Int64, HasLimit, ShowNull>(distinct, argument_types,
result_is_nullable);
} else if (which.is_date_v2() || which.is_ipv4()) {
} else if (which.is_date_v2()) {
return do_create_agg_function_collect<UInt32, HasLimit, ShowNull>(distinct, argument_types,
result_is_nullable);
} else if (which.is_date_time_v2() || which.is_ipv6()) {
} else if (which.is_date_time_v2()) {
return do_create_agg_function_collect<UInt64, HasLimit, ShowNull>(distinct, argument_types,
result_is_nullable);
} else if (which.is_ipv6()) {
return do_create_agg_function_collect<IPv6, HasLimit, ShowNull>(distinct, argument_types,
result_is_nullable);
} else if (which.is_ipv4()) {
return do_create_agg_function_collect<IPv4, HasLimit, ShowNull>(distinct, argument_types,
result_is_nullable);
} else if (which.is_string()) {
return do_create_agg_function_collect<StringRef, HasLimit, ShowNull>(
distinct, argument_types, result_is_nullable);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/serde/data_type_array_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class DataTypeArraySerDe : public DataTypeSerDe {
nested_serde->set_return_object_as_string(value);
}

virtual DataTypeSerDeSPtrs get_nested_serdes() const override { return {nested_serde}; }

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/data_types/serde/data_type_map_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class DataTypeMapSerDe : public DataTypeSerDe {
value_serde->set_return_object_as_string(value);
}

virtual DataTypeSerDeSPtrs get_nested_serdes() const override {
return {key_serde, value_serde};
}

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/serde/data_type_nullable_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class DataTypeNullableSerDe : public DataTypeSerDe {
int row_num) const override;
Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override;

virtual DataTypeSerDeSPtrs get_nested_serdes() const override { return {nested_serde}; }

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
Loading

0 comments on commit 9ed9349

Please sign in to comment.