Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wg](chore) rename workload group memory property names #44028

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
{"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
Expand Down
42 changes: 21 additions & 21 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ namespace doris {
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
const static int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 80;

WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {}

Expand All @@ -64,8 +64,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_scan_thread_num(tg_info.scan_thread_num),
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_memory_low_watermark(tg_info.memory_low_watermark),
_memory_high_watermark(tg_info.memory_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
_need_create_query_thread_pool(need_create_query_thread_pool) {
Expand All @@ -91,27 +91,27 @@ std::string WorkloadGroup::debug_string() const {
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, "
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, "
"memory_low_watermark={}, memory_high_watermark={}, is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(),
_memory_low_watermark, _memory_high_watermark, _is_shutdown, _query_ctxs.size(),
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}

std::string WorkloadGroup::memory_debug_string() const {
return fmt::format(
"TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, weighted_memory_limit = {}, total_mem_used = {}, "
"wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, "
"spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
"wg_refresh_interval_memory_growth = {}, memory_low_watermark = {}, "
"memory_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
_id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES),
_spill_low_watermark, _spill_high_watermark, _version, _is_shutdown,
_memory_low_watermark, _memory_high_watermark, _version, _is_shutdown,
_query_ctxs.size());
}

Expand All @@ -137,8 +137,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_scan_thread_num = tg_info.scan_thread_num;
_max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num;
_min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
_spill_low_watermark = tg_info.spill_low_watermark;
_spill_high_watermark = tg_info.spill_high_watermark;
_memory_low_watermark = tg_info.memory_low_watermark;
_memory_high_watermark = tg_info.memory_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
} else {
Expand Down Expand Up @@ -396,16 +396,16 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num;
}

// 12 spill low watermark
int spill_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
spill_low_watermark = tworkload_group_info.spill_threshold_low_watermark;
// 12 memory low watermark
int memory_low_watermark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_low_watermark) {
memory_low_watermark = tworkload_group_info.memory_low_watermark;
}

// 13 spil high watermark
int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
spill_high_watermark = tworkload_group_info.spill_threshold_high_watermark;
// 13 memory high watermark
int memory_high_watermark = MEMORY_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_high_watermark) {
memory_high_watermark = tworkload_group_info.memory_high_watermark;
}

// 14 scan io
Expand Down Expand Up @@ -433,8 +433,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.scan_thread_num = scan_thread_num,
.max_remote_scan_thread_num = max_remote_scan_thread_num,
.min_remote_scan_thread_num = min_remote_scan_thread_num,
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.memory_low_watermark = memory_low_watermark,
.memory_high_watermark = memory_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
}
Expand Down
22 changes: 11 additions & 11 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void do_sweep();

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
int memory_low_watermark() const {
return _memory_low_watermark.load(std::memory_order_relaxed);
}
int spill_threashold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
int memory_high_watermark() const {
return _memory_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
Expand All @@ -107,7 +107,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
_memory_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
Expand All @@ -122,10 +122,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
_memory_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
_memory_high_watermark.load(std::memory_order_relaxed) / 100));
}

std::string debug_string() const;
Expand Down Expand Up @@ -233,8 +233,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
std::atomic<int> _min_remote_scan_thread_num;
std::atomic<int> _spill_low_watermark;
std::atomic<int> _spill_high_watermark;
std::atomic<int> _memory_low_watermark;
std::atomic<int> _memory_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};

Expand Down Expand Up @@ -282,8 +282,8 @@ struct WorkloadGroupInfo {
const int scan_thread_num = 0;
const int max_remote_scan_thread_num = 0;
const int min_remote_scan_thread_num = 0;
const int spill_low_watermark = 0;
const int spill_high_watermark = 0;
const int memory_low_watermark = 0;
const int memory_high_watermark = 0;
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
// log cgroup cpu info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ public class SchemaTable extends Table {
.column("SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.column("READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
Expand Down
Loading
Loading