Skip to content

Commit

Permalink
Merge branch 'master' into master-auth-classloader
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Sep 24, 2024
2 parents c1fc289 + dd5605e commit 7dad3c7
Show file tree
Hide file tree
Showing 900 changed files with 20,189 additions and 18,213 deletions.
49 changes: 31 additions & 18 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

#include "agent/be_exec_version_manager.h"

namespace doris {
#include "common/exception.h"

const std::map<int, const std::set<std::string>> AGGREGATION_CHANGE_MAP = {
{AGGREGATION_2_1_VERSION,
{"window_funnel", "stddev_samp", "variance_samp", "percentile_approx_weighted",
"percentile_approx", "covar_samp", "percentile", "percentile_array"}}};
namespace doris {

Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
if (be_exec_version > max_be_exec_version || be_exec_version < min_be_exec_version) {
Expand All @@ -35,19 +32,35 @@ Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
return Status::OK();
}

void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_version,
int data_be_exec_version,
std::string function_name) {
if (current_be_exec_version > AGGREGATION_2_1_VERSION &&
data_be_exec_version <= AGGREGATION_2_1_VERSION &&
AGGREGATION_CHANGE_MAP.find(AGGREGATION_2_1_VERSION)->second.contains(function_name)) {
throw Exception(Status::InternalError(
"agg state data with {} is not supported, "
"current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data "
"or set the be_exec_version={} in fe.conf",
function_name, current_be_exec_version, data_be_exec_version,
AGGREGATION_2_1_VERSION));
int BeExecVersionManager::get_function_compatibility(int be_exec_version,
std::string function_name) {
auto it = _function_change_map.find(function_name);
if (it == _function_change_map.end()) {
// 0 means no compatibility issues need to be dealt with
return 0;
}

auto version_it = it->second.lower_bound(be_exec_version);
if (version_it == it->second.end()) {
return 0;
}

return *version_it;
}

void BeExecVersionManager::check_function_compatibility(int current_be_exec_version,
int data_be_exec_version,
std::string function_name) {
if (get_function_compatibility(current_be_exec_version, function_name) ==
get_function_compatibility(data_be_exec_version, function_name)) {
return;
}

throw Exception(Status::InternalError(
"agg state data with {} is not supported, "
"current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data "
"or set the be_exec_version={} in fe.conf temporary",
function_name, current_be_exec_version, data_be_exec_version, data_be_exec_version));
}

/**
Expand Down Expand Up @@ -88,5 +101,5 @@ void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_ver
*/
const int BeExecVersionManager::max_be_exec_version = 7;
const int BeExecVersionManager::min_be_exec_version = 0;

std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {};
} // namespace doris
21 changes: 18 additions & 3 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,43 @@

namespace doris {

constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6;
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
5; // some aggregation changed the data format after this version
6; // some aggregation changed the data format after this version

class BeExecVersionManager {
public:
BeExecVersionManager() = delete;

static Status check_be_exec_version(int be_exec_version);

static void check_agg_state_compatibility(int current_be_exec_version, int data_be_exec_version,
std::string function_name);
static int get_function_compatibility(int be_exec_version, std::string function_name);

static void check_function_compatibility(int current_be_exec_version, int data_be_exec_version,
std::string function_name);

static int get_newest_version() { return max_be_exec_version; }

static std::string get_function_suffix(int be_exec_version) {
return "_for_old_version_" + std::to_string(be_exec_version);
}

// For example, there are incompatible changes between version=7 and version=6, at this time breaking_old_version is 6.
static void registe_old_function_compatibility(int breaking_old_version,
std::string function_name) {
_function_change_map[function_name].insert(breaking_old_version);
}

private:
static const int max_be_exec_version;
static const int min_be_exec_version;
// [function name] -> [breaking change start version]
static std::map<std::string, std::set<int>> _function_change_map;
};

} // namespace doris
47 changes: 27 additions & 20 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,36 +246,43 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
<< "FE cloud mode: "
<< (master_info.__isset.meta_service_endpoint ? "true" : "false")
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false");
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
if (master_info.__isset.meta_service_endpoint) {
if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint
<< " " << st;
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
if (master_info.meta_service_endpoint != config::meta_service_endpoint) {
LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE "
"and BE. "
<< "FE meta_service_endpoint: " << master_info.meta_service_endpoint
<< ", BE meta_service_endpoint: " << config::meta_service_endpoint;
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe meta_service_endpoint: {},"
" be meta_service_endpoint: {}",
master_info.meta_service_endpoint, config::meta_service_endpoint);
}
}

if (master_info.__isset.cloud_unique_id &&
config::cloud_unique_id != master_info.cloud_unique_id &&
config::enable_use_cloud_unique_id_from_fe) {
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ Status CloudBaseCompaction::modify_rowsets() {
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
Expand Down
87 changes: 84 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cloud/cloud_cumulative_compaction.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
Expand All @@ -27,6 +28,7 @@
#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/trace.h"
#include "util/uuid_generator.h"

Expand Down Expand Up @@ -254,13 +256,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
Expand Down Expand Up @@ -340,9 +342,88 @@ Status CloudCumulativeCompaction::modify_rowsets() {
stats.num_rows(), stats.data_size());
}
}
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}
return Status::OK();
}

void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id,
pre_max_version - 1};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(
std::make_tuple(_tablet->tablet_id(), start, before_end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Status update_st;
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
{
update_st = Status::InternalError(
"test fail to update delete bitmap for tablet_id {}",
cloud_tablet()->tablet_id());
});
if (update_st.ok()) {
update_st = _engine.meta_mgr().update_delete_bitmap_without_lock(
*cloud_tablet(), new_delete_bitmap.get());
}
if (!update_st.ok()) {
std::stringstream ss;
ss << "failed to update delete bitmap for tablet=" << cloud_tablet()->tablet_id()
<< " st=" << update_st.to_string();
std::string msg = ss.str();
LOG(WARNING) << msg;
} else {
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())
->delete_expired_stale_rowsets();
});
}
}
}
}

void CloudCumulativeCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
cloud::TabletJobInfoPB job;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

void update_cumulative_point();

void process_old_version_delete_bitmap();

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

std::string _uuid;
Expand Down
7 changes: 3 additions & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ namespace doris {

CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
int64_t compaction_min_size, int64_t promotion_version_count)
int64_t compaction_min_size)
: _promotion_size(promotion_size),
_promotion_ratio(promotion_ratio),
_promotion_min_size(promotion_min_size),
_compaction_min_size(compaction_min_size),
_promotion_version_count(promotion_version_count) {}
_compaction_min_size(compaction_min_size) {}

int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
if (size < 1024) return 0;
Expand Down Expand Up @@ -205,7 +204,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
// consider it's version count here.
bool satisfy_promotion_version = tablet->enable_unique_key_merge_on_write() &&
output_rowset->end_version() - output_rowset->start_version() >
_promotion_version_count;
config::compaction_promotion_version_count;
// if rowsets have delete version, move to the last directly.
// if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
return output_rowset->start_version() == last_cumulative_point &&
Expand Down
5 changes: 1 addition & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024,
double promotion_ratio = config::compaction_promotion_ratio,
int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024,
int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024,
int64_t promotion_version_count = config::compaction_promotion_version_count);
int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024);

~CloudSizeBasedCumulativeCompactionPolicy() override = default;

Expand Down Expand Up @@ -94,8 +93,6 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
int64_t _promotion_min_size;
/// lower bound size to do compaction compaction.
int64_t _compaction_min_size;
// cumulative compaction promotion version count, only works for unique key MoW table
int64_t _promotion_version_count;
};

class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
Expand Down
Loading

0 comments on commit 7dad3c7

Please sign in to comment.