Skip to content

Commit

Permalink
fix review two
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng committed Oct 28, 2024
1 parent 3c569bf commit f49234c
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 43 deletions.
12 changes: 4 additions & 8 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
Expand Down Expand Up @@ -275,14 +276,9 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

if (master_info.__isset.cloud_tablet_report_exceed_time_limit &&
config::cloud_tablet_report_exceed_time_limit <= 0) {
// be not set, use fe heartbeat, default be not set it
auto st = config::set_config(
"cloud_tablet_report_exceed_time_limit",
std::to_string(master_info.cloud_tablet_report_exceed_time_limit), true);
LOG(INFO) << "set config cloud_tablet_report_exceed_time_limit "
<< master_info.cloud_tablet_report_exceed_time_limit << " " << st;
if (master_info.__isset.tablet_report_inactive_duration_ms) {
doris::g_tablet_report_inactive_duration_ms =
master_info.tablet_report_inactive_duration_ms;
}

if (need_report) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1154,11 +1154,11 @@ void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& maste

increase_report_version();
uint64_t report_version;
uint64_t tablet_num = 0;
uint64_t total_num_tablets = 0;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
report_version = s_report_version;
engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &tablet_num);
engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
if (report_version == s_report_version) {
break;
}
Expand All @@ -1171,7 +1171,7 @@ void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& maste
}

request.__set_report_version(report_version);
request.__set_num_tablets(tablet_num);
request.__set_num_tablets(total_num_tablets);

bool succ = handle_report(request, master_info, "tablet");
report_tablet_total << 1;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class CloudTablet final : public BaseTablet {
int64_t last_base_compaction_success_time_ms = 0;
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;
int64_t last_cache_release_ms = 0;
int64_t last_access_time_ms = 0;

// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;
Expand Down
38 changes: 19 additions & 19 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "runtime/memory/cache_policy.h"

namespace doris {
uint64_t g_tablet_report_inactive_duration_ms = 0;
namespace {

// port from
Expand Down Expand Up @@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)

CloudTabletMgr::~CloudTabletMgr() = default;

void set_tablet_access_time_ms(CloudTablet* tablet) {
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->last_access_time_ms = now;
}

Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id,
bool warmup_data) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
Expand Down Expand Up @@ -183,10 +190,7 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
CachePriority::NORMAL);
auto ret =
std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) {
int64_t now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
tablet->last_cache_release_ms = now;
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
_tablet_map->put(std::move(tablet));
Expand All @@ -197,15 +201,14 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
if (tablet == nullptr) {
return ResultError(Status::InternalError("failed to get tablet {}", tablet_id));
}
set_tablet_access_time_ms(tablet.get());
return tablet;
}

CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
set_tablet_access_time_ms(tablet_raw_ptr);
auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) {
int64_t now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
tablet->last_cache_release_ms = now;
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
return tablet;
Expand Down Expand Up @@ -381,11 +384,9 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
(*tablet_num)++;
TTabletInfo tablet_info;
tablet->build_tablet_report_info(&tablet_info);
int64_t now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (now - config::cloud_tablet_report_exceed_time_limit * 1000 <
tablet->last_cache_release_ms) {
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) {
// the tablet is still being accessed and used in recently, so not report it
return;
}
Expand All @@ -406,18 +407,17 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
<< " exceed drop time limit count=" << tablets_info->size();
}

void CloudTabletMgr::obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info,
int64_t num) {
void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) {
auto weak_tablets = get_weak_tablets();
for (auto& weak_tablet : weak_tablets) {
auto t = weak_tablet.lock();
if (t == nullptr) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablets_info.size() >= num) {
if (tablets_info->size() >= num_tablets) {
return;
}
tablets_info.push_back(t->get_tablet_info());
tablets_info->push_back(tablet->get_tablet_info());
}
}

Expand Down
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class CloudStorageEngine;
class LRUCachePolicy;
class CountDownLatch;

extern uint64_t g_tablet_report_inactive_duration_ms;

class CloudTabletMgr {
public:
CloudTabletMgr(CloudStorageEngine& engine);
Expand Down Expand Up @@ -68,10 +70,16 @@ class CloudTabletMgr {
std::vector<std::shared_ptr<CloudTablet>>* tablets,
int64_t* max_score);

/**
* Gets tablets info and total tablet num that are reported
*
* @param tablets_info used by report
* @param tablet_num tablets in be tabletMgr, total num
*/
void build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info,
uint64_t* tablet_num);

void obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info, int64_t num);
void get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info);

private:
CloudStorageEngine& _engine;
Expand Down
2 changes: 0 additions & 2 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,4 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_mInt32(cloud_tablet_report_exceed_time_limit, "0");
} // namespace doris::config
1 change: 0 additions & 1 deletion be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,4 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

DECLARE_mInt32(cloud_tablet_report_exceed_time_limit);
} // namespace doris::config
2 changes: 1 addition & 1 deletion be/src/http/action/tablets_info_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
} else {
CloudTabletMgr& cloud_tablet_manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr();
cloud_tablet_manager.obtain_specific_quantity_tablets(tablets_info, number);
cloud_tablet_manager.get_tablet_info(number, &tablets_info);
}

tablets_info_ej["msg"] = msg;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "gutil/strings/strcat.h"
#include "gutil/strings/substitute.h"
#include "io/fs/local_file_system.h"
#include "olap/base_tablet.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/base_tablet.h"
#include "olap/data_dir.h"
#include "olap/delta_writer.h"
#include "olap/olap_common.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ private static void deleteFromBackend(long backendId, Set<Long> tabletIdsWillDro
int deleteFromBackendCounter = 0;
AgentBatchTask batchTask = new AgentBatchTask();
for (Long tabletId : tabletIdsWillDrop) {
if (LOG.isDebugEnabled()) {
LOG.debug("process tablet [{}], backend[{}]", tabletId, backendId);
}
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, -1, -1, false);
batchTask.addTask(task);
LOG.info("delete tablet[{}] from backend[{}]", tabletId, backendId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public HeartbeatResponse call() {
if (Config.isCloudMode()) {
String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
copiedMasterInfo.setCloudTabletReportExceedTimeLimit(Config.rehash_tablet_after_be_dead_seconds);
copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds);
}
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/HeartbeatService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct TMasterInfo {
10: optional string meta_service_endpoint;
11: optional string cloud_unique_id;
// See configuration item Config.java rehash_tablet_after_be_dead_seconds for meaning
12: optional i64 cloud_tablet_report_exceed_time_limit;
12: optional i64 tablet_report_inactive_duration_ms;
}

struct TBackendInfo {
Expand Down

0 comments on commit f49234c

Please sign in to comment.