Skip to content

Commit

Permalink
Merge branch 'master' into enhancement_nereids_show-catalog-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR authored Dec 19, 2024
2 parents d360f66 + 2a7c2e3 commit e65858d
Show file tree
Hide file tree
Showing 2,739 changed files with 95,241 additions and 16,817 deletions.
7 changes: 1 addition & 6 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ github:
- cloud_p0 (Doris Cloud Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Broker
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- Build Third Party Libraries (macOS-arm64)
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand All @@ -80,7 +78,6 @@ github:
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- FE UT (Doris FE UT)
Expand All @@ -103,7 +100,6 @@ github:
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand All @@ -128,7 +124,6 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand Down
Binary file added aazcp.tar.gz
Binary file not shown.
4 changes: 3 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,11 +1630,13 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
dropped_tablet->tablet_uid());
LOG_INFO("successfully drop tablet")
.tag("signature", req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id);
.tag("tablet_id", drop_tablet_req.tablet_id)
.tag("replica_id", drop_tablet_req.replica_id);
} else {
LOG_WARNING("failed to drop tablet")
.tag("signature", req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id)
.tag("replica_id", drop_tablet_req.replica_id)
.error(status);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Status CloudBaseCompaction::prepare_compact() {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
Expand Down Expand Up @@ -320,6 +321,10 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
46 changes: 19 additions & 27 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "util/uuid_generator.h"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
Expand Down Expand Up @@ -91,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
if (!config::enable_sleep_between_delete_cumu_compaction) {
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
}
return st;
}
Expand Down Expand Up @@ -263,6 +268,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
Expand Down Expand Up @@ -371,37 +380,17 @@ Status CloudCumulativeCompaction::modify_rowsets() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
Expand All @@ -421,9 +410,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets();
});
}
}
return Status::OK();
Expand Down Expand Up @@ -486,8 +475,10 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
double process_memory_usage =
cast_set<double>(doris::GlobalMemoryArbitrator::process_memory_usage());
bool memory_usage_high =
process_memory_usage > cast_set<double>(MemInfo::soft_mem_limit()) * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
Expand Down Expand Up @@ -617,4 +608,5 @@ void CloudCumulativeCompaction::do_lease() {
}
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/compaction.h"

namespace doris {
#include "common/compile_check_begin.h"

class CloudCumulativeCompaction : public CloudCompactionMixin {
public:
Expand Down Expand Up @@ -60,4 +61,5 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {
Version _last_delete_version {-1, -1};
};

#include "common/compile_check_end.h"
} // namespace doris
12 changes: 7 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "olap/tablet_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
Expand All @@ -48,7 +49,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}

int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -114,8 +115,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
size_t new_compaction_score = *compaction_score;
while (rs_begin != input_rowsets->end()) {
auto& rs_meta = (*rs_begin)->rowset_meta();
int current_level = _level_size(rs_meta->total_disk_size());
int remain_level = _level_size(total_size - rs_meta->total_disk_size());
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, input rowsets contain current rowset
// and process return; otherwise, input rowsets do not contain current rowset.
if (current_level <= remain_level) {
Expand Down Expand Up @@ -185,7 +186,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
}

int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
int64_t promotion_size = int64_t(t->base_size() * _promotion_ratio);
int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
// promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
return promotion_size > _promotion_size ? _promotion_size
: promotion_size < _promotion_min_size ? _promotion_min_size
Expand Down Expand Up @@ -215,7 +216,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
: last_cumulative_point;
}

int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -377,4 +378,5 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
return output_rowset->end_version() + 1;
}

#include "common/compile_check_end.h"
} // namespace doris
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/rowset/rowset_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

class Tablet;
struct Version;
Expand All @@ -44,7 +45,7 @@ class CloudCumulativeCompactionPolicy {

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;

virtual int32_t pick_input_rowsets(CloudTablet* tablet,
virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -71,7 +72,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
return 0;
}

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand Down Expand Up @@ -106,7 +107,7 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -115,4 +116,5 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
bool allow_delete = false) override;
};

#include "common/compile_check_end.h"
} // namespace doris
Loading

0 comments on commit e65858d

Please sign in to comment.