Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](cloud) Add cloud report for clean up expired tablets #42066

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "agent/workload_sched_policy_listener.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -193,7 +194,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLE", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); }));
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); }));
deardeng marked this conversation as resolved.
Show resolved Hide resolved
// clang-format on
}

Expand All @@ -211,13 +212,25 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); });

// cloud, drop tablet just clean clear_cache, so just one thread do it
_workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
"DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));

if (config::enable_cloud_tablet_report) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,
[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info);
}));
}
}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
Expand Down
6 changes: 6 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
Expand Down Expand Up @@ -275,6 +276,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

if (master_info.__isset.tablet_report_inactive_duration_ms) {
doris::g_tablet_report_inactive_duration_ms =
master_info.tablet_report_inactive_duration_ms;
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
Expand Down
61 changes: 61 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
#include "cloud/cloud_delete_task.h"
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
#include "cloud/cloud_schema_change_job.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -116,6 +118,10 @@ bool register_task_info(const TTaskType::type task_type, int64_t signature) {
// no need to report task of these types
return true;
}
if (task_type == TTaskType::type::DROP && config::is_cloud_mode()) {
// cloud no need to report drop task status
return true;
}

if (signature == -1) { // No need to report task with unintialized signature
return true;
Expand Down Expand Up @@ -1134,6 +1140,46 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
}
}

void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

increase_report_version();
uint64_t report_version;
uint64_t total_num_tablets = 0;
for (int i = 0; i < 5; i++) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
request.tablets.clear();
report_version = s_report_version;
engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
if (report_version == s_report_version) {
break;
}
}

if (report_version < s_report_version) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

request.__set_report_version(report_version);
request.__set_num_tablets(total_num_tablets);

bool succ = handle_report(request, master_info, "tablet");
report_tablet_total << 1;
if (!succ) [[unlikely]] {
report_tablet_failed << 1;
}
}

void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
const auto& upload_request = req.upload_req;

Expand Down Expand Up @@ -1610,6 +1656,21 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
remove_task_info(req.task_type, req.signature);
}

void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
const auto& drop_tablet_req = req.drop_tablet_req;
DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
return;
deardeng marked this conversation as resolved.
Show resolved Hide resolved
deardeng marked this conversation as resolved.
Show resolved Hide resolved
}

void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const auto& push_req = req.push_req;

Expand Down
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req)

void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);

void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void push_callback(StorageEngine& engine, const TAgentTaskRequest& req);
Expand Down Expand Up @@ -188,6 +190,8 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,12 @@ Status CloudTablet::sync_meta() {
return Status::OK();
}

void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
std::shared_lock rdlock(_meta_lock);
tablet_info->__set_total_version_count(_tablet_meta->version_count());
tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
// Currently, this information will not be used by the cloud report,
// but it may be used in the future.
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,13 @@ class CloudTablet final : public BaseTablet {
int64_t last_base_compaction_success_time_ms = 0;
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;
int64_t last_access_time_ms = 0;

// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;

void build_tablet_report_info(TTabletInfo* tablet_info);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
72 changes: 68 additions & 4 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "runtime/memory/cache_policy.h"

namespace doris {
uint64_t g_tablet_report_inactive_duration_ms = 0;
namespace {

// port from
Expand Down Expand Up @@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)

CloudTabletMgr::~CloudTabletMgr() = default;

void set_tablet_access_time_ms(CloudTablet* tablet) {
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->last_access_time_ms = now;
}

Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id,
bool warmup_data) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
Expand Down Expand Up @@ -181,8 +188,11 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i

auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
CachePriority::NORMAL);
auto ret = std::shared_ptr<CloudTablet>(
tablet.get(), [this, handle](...) { _cache->release(handle); });
auto ret =
std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) {
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
_tablet_map->put(std::move(tablet));
return ret;
};
Expand All @@ -191,12 +201,16 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
if (tablet == nullptr) {
return ResultError(Status::InternalError("failed to get tablet {}", tablet_id));
}
set_tablet_access_time_ms(tablet.get());
return tablet;
}

CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr,
[this, handle](...) { _cache->release(handle); });
set_tablet_access_time_ms(tablet_raw_ptr);
auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) {
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
return tablet;
}

Expand Down Expand Up @@ -357,4 +371,54 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
return Status::OK();
}

void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info,
uint64_t* tablet_num) {
DCHECK(tablets_info != nullptr);
VLOG_NOTICE << "begin to build all report cloud tablets info";

HistogramStat tablet_version_num_hist;

auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
auto tablet = tablet_wk.lock();
if (!tablet) return;
(*tablet_num)++;
TTabletInfo tablet_info;
tablet->build_tablet_report_info(&tablet_info);
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) {
// the tablet is still being accessed and used in recently, so not report it
return;
}
auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
// On the cloud, a specific BE has only one tablet replica;
// there are no multiple replicas for a specific BE.
// This is only to reuse the non-cloud report protocol.
tablet_version_num_hist.add(tablet_info.total_version_count);
t_tablet.tablet_infos.emplace_back(std::move(tablet_info));
};

auto weak_tablets = get_weak_tablets();
std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);

DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
tablet_version_num_hist);
LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num
deardeng marked this conversation as resolved.
Show resolved Hide resolved
<< " exceed drop time limit count=" << tablets_info->size();
}

void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) {
auto weak_tablets = get_weak_tablets();
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablets_info->size() >= num_tablets) {
return;
}
tablets_info->push_back(tablet->get_tablet_info());
}
}

} // namespace doris
16 changes: 16 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

#include <gen_cpp/MasterService_types.h>
deardeng marked this conversation as resolved.
Show resolved Hide resolved
#include <gen_cpp/Types_types.h>

#include <functional>
#include <memory>
#include <vector>
Expand All @@ -31,6 +34,8 @@ class CloudStorageEngine;
class LRUCachePolicy;
class CountDownLatch;

extern uint64_t g_tablet_report_inactive_duration_ms;

class CloudTabletMgr {
public:
CloudTabletMgr(CloudStorageEngine& engine);
Expand Down Expand Up @@ -65,6 +70,17 @@ class CloudTabletMgr {
std::vector<std::shared_ptr<CloudTablet>>* tablets,
int64_t* max_score);

/**
* Gets tablets info and total tablet num that are reported
*
* @param tablets_info used by report
* @param tablet_num tablets in be tabletMgr, total num
*/
void build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info,
deardeng marked this conversation as resolved.
Show resolved Hide resolved
uint64_t* tablet_num);

void get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info);

private:
CloudStorageEngine& _engine;

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");

DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");
} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,6 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds);

DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

} // namespace doris::config
20 changes: 11 additions & 9 deletions be/src/http/action/tablets_info_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <string>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
Expand Down Expand Up @@ -51,12 +53,6 @@ void TabletsInfoAction::handle(HttpRequest* req) {

EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
EasyJson tablets_info_ej;
if (config::is_cloud_mode()) {
// TODO(plat1ko): CloudStorageEngine
tablets_info_ej["msg"] = "TabletsInfoAction::get_tablets_info is not implemented";
tablets_info_ej["code"] = 0;
return tablets_info_ej;
}

int64_t number;
std::string msg;
Expand All @@ -74,9 +70,15 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
msg = "Parameter Error";
}
std::vector<TabletInfo> tablets_info;
TabletManager* tablet_manager =
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
if (!config::is_cloud_mode()) {
TabletManager* tablet_manager =
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
} else {
CloudTabletMgr& cloud_tablet_manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr();
cloud_tablet_manager.get_tablet_info(number, &tablets_info);
}

tablets_info_ej["msg"] = msg;
tablets_info_ej["code"] = 0;
Expand Down
Loading
Loading