From ed50d915803596b5645e3378a7d20bb5722c1541 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Sat, 12 Oct 2024 16:16:40 +0800 Subject: [PATCH 1/5] [Feature](cloud) Add cloud report for clean up expired tablets --- be/src/agent/agent_server.cpp | 15 ++- be/src/agent/heartbeat_server.cpp | 10 ++ be/src/agent/task_worker_pool.cpp | 56 +++++++++++ be/src/agent/task_worker_pool.h | 4 + be/src/cloud/cloud_tablet.cpp | 8 ++ be/src/cloud/cloud_tablet.h | 3 + be/src/cloud/cloud_tablet_mgr.cpp | 54 ++++++++++- be/src/cloud/cloud_tablet_mgr.h | 6 ++ be/src/cloud/config.cpp | 3 + be/src/cloud/config.h | 3 + .../doris/cloud/CacheHotspotManager.java | 4 +- .../cloud/catalog/CloudTabletRebalancer.java | 42 +++++++-- .../cloud/master/CloudReportHandler.java | 92 +++++++++++++++++++ .../org/apache/doris/master/MasterImpl.java | 3 +- .../apache/doris/master/ReportHandler.java | 19 +++- .../org/apache/doris/qe/StmtExecutor.java | 2 +- .../org/apache/doris/system/HeartbeatMgr.java | 1 + .../apache/doris/clone/RepairVersionTest.java | 10 +- gensrc/thrift/HeartbeatService.thrift | 2 + gensrc/thrift/MasterService.thrift | 2 + 20 files changed, 314 insertions(+), 25 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 9d36148b64f305..361a8ab93a90a6 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -33,6 +33,7 @@ #include "agent/utils.h" #include "agent/workload_group_listener.h" #include "agent/workload_sched_policy_listener.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -193,7 +194,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); _report_workers.push_back(std::make_unique( - "REPORT_OLAP_TABLE", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); })); + "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); })); // clang-format on } @@ -211,6 +212,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_ "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count, [&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); }); + // cloud, drop tablet just clean clear_cache, so just one thread do it + _workers[TTaskType::DROP] = std::make_unique( + "DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); }); + _report_workers.push_back(std::make_unique( "REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); })); @@ -218,6 +223,14 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_ _report_workers.push_back(std::make_unique( "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); + + if (config::enable_cloud_tablet_report) { + _report_workers.push_back(std::make_unique( + "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds, + [&engine, &master_info = _master_info] { + report_tablet_callback(engine, master_info); + })); + } } // TODO(lingbin): each task in the batch may have it own status or FE must check and diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 146604aaab20f4..b4188e4fb3840e 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -275,6 +275,16 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st; } + if (master_info.__isset.cloud_tablet_report_exceed_time_limit && + config::cloud_tablet_report_exceed_time_limit <= 0) { + // be not set, use fe heartbeat, default be not set it + auto st = config::set_config( + "cloud_tablet_report_exceed_time_limit", + std::to_string(master_info.cloud_tablet_report_exceed_time_limit), true); + LOG(INFO) << "set config cloud_tablet_report_exceed_time_limit " + << master_info.cloud_tablet_report_exceed_time_limit << " " << st; + } + if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _engine.notify_listeners(); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5906511ce15794..ff8eee2e5bd300 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -48,6 +48,8 @@ #include "cloud/cloud_delete_task.h" #include "cloud/cloud_engine_calc_delete_bitmap_task.h" #include "cloud/cloud_schema_change_job.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -116,6 +118,10 @@ bool register_task_info(const TTaskType::type task_type, int64_t signature) { // no need to report task of these types return true; } + if (task_type == TTaskType::type::DROP && config::is_cloud_mode()) { + // cloud no need to report drop task status + return true; + } if (signature == -1) { // No need to report task with unintialized signature return true; @@ -1134,6 +1140,46 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf } } +void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) { + // Random sleep 1~5 seconds before doing report. + // In order to avoid the problem that the FE receives many report requests at the same time + // and can not be processed. + if (config::report_random_wait) { + random_sleep(5); + } + + TReportRequest request; + request.__set_backend(BackendOptions::get_local_backend()); + request.__isset.tablets = true; + + increase_report_version(); + uint64_t report_version; + uint64_t tablet_num = 0; + for (int i = 0; i < 5; i++) { + request.tablets.clear(); + report_version = s_report_version; + engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &tablet_num); + if (report_version == s_report_version) { + break; + } + } + + if (report_version < s_report_version) { + LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; + DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); + return; + } + + request.__set_report_version(report_version); + request.__set_num_tablets(tablet_num); + + bool succ = handle_report(request, master_info, "tablet"); + report_tablet_total << 1; + if (!succ) [[unlikely]] { + report_tablet_failed << 1; + } +} + void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { const auto& upload_request = req.upload_req; @@ -1610,6 +1656,16 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { remove_task_info(req.task_type, req.signature); } +void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { + const auto& drop_tablet_req = req.drop_tablet_req; + // 1. erase lru from tablet mgr + // TODO(dx) clean tablet file cache + // get tablet's info(such as cachekey, tablet id, rsid) + engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); + // 2. gen clean file cache task + return; +} + void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& push_req = req.push_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index f51d6c2a4c0dc0..c50ac57ffe9b74 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -155,6 +155,8 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); + void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req); void push_callback(StorageEngine& engine, const TAgentTaskRequest& req); @@ -188,6 +190,8 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info); +void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info); + void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); } // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c046259b0da71c..5b9c2d5a9e33f8 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -872,4 +872,12 @@ Status CloudTablet::sync_meta() { return Status::OK(); } +void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) { + std::shared_lock rdlock(_meta_lock); + tablet_info->__set_total_version_count(_tablet_meta->version_count()); + tablet_info->__set_tablet_id(_tablet_meta->tablet_id()); + // Currently, this information will not be used by the cloud report, + // but it may be used in the future. +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 53747dc19e27de..ef89afefd5a115 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -196,10 +196,13 @@ class CloudTablet final : public BaseTablet { int64_t last_base_compaction_success_time_ms = 0; int64_t last_cumu_compaction_success_time_ms = 0; int64_t last_cumu_no_suitable_version_ms = 0; + int64_t last_cache_release_ms = 0; // Return merged extended schema TabletSchemaSPtr merged_tablet_schema() const override; + void build_tablet_report_info(TTabletInfo* tablet_info); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index e5c31785c1eb1c..42744913c3d268 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -181,8 +181,14 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet), CachePriority::NORMAL); - auto ret = std::shared_ptr( - tablet.get(), [this, handle](...) { _cache->release(handle); }); + auto ret = + std::shared_ptr(tablet.get(), [this, handle](CloudTablet* tablet) { + int64_t now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + tablet->last_cache_release_ms = now; + _cache->release(handle); + }); _tablet_map->put(std::move(tablet)); return ret; }; @@ -195,8 +201,13 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i } CloudTablet* tablet_raw_ptr = reinterpret_cast(_cache->value(handle))->tablet.get(); - auto tablet = std::shared_ptr(tablet_raw_ptr, - [this, handle](...) { _cache->release(handle); }); + auto tablet = std::shared_ptr(tablet_raw_ptr, [this, handle](CloudTablet* tablet) { + int64_t now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + tablet->last_cache_release_ms = now; + _cache->release(handle); + }); return tablet; } @@ -357,4 +368,39 @@ Status CloudTabletMgr::get_topn_tablets_to_compact( return Status::OK(); } +void CloudTabletMgr::build_all_report_tablets_info(std::map* tablets_info, + uint64_t* tablet_num) { + DCHECK(tablets_info != nullptr); + VLOG_NOTICE << "begin to build all report cloud tablets info"; + + HistogramStat tablet_version_num_hist; + + auto handler = [&](const std::weak_ptr& tablet_wk) { + auto tablet = tablet_wk.lock(); + if (!tablet) return; + (*tablet_num)++; + TTabletInfo tablet_info; + tablet->build_tablet_report_info(&tablet_info); + if (::time(nullptr) - config::cloud_tablet_report_exceed_time_limit < + tablet->last_cache_release_ms / 1000) { + // the tablet is still being accessed and used in recently, so not report it + return; + } + auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; + // On the cloud, a specific BE has only one tablet replica; + // there are no multiple replicas for a specific BE. + // This is only to reuse the non-cloud report protocol. + tablet_version_num_hist.add(tablet_info.total_version_count); + t_tablet.tablet_infos.emplace_back(std::move(tablet_info)); + }; + + auto weak_tablets = get_weak_tablets(); + std::for_each(weak_tablets.begin(), weak_tablets.end(), handler); + + DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( + tablet_version_num_hist); + LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num + << " exceed drop time limit count=" << tablets_info->size(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 976d483b36c143..2a588602bdc7aa 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -17,6 +17,9 @@ #pragma once +#include +#include + #include #include #include @@ -65,6 +68,9 @@ class CloudTabletMgr { std::vector>* tablets, int64_t* max_score); + void build_all_report_tablets_info(std::map* tablets_info, + uint64_t* tablet_num); + private: CloudStorageEngine& _engine; diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index e724dbea84e10c..19c1add821c8ac 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -75,4 +75,7 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true"); +DEFINE_mBool(enable_cloud_tablet_report, "true"); + +DEFINE_mInt32(cloud_tablet_report_exceed_time_limit, "0"); } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 86197f924d0cad..f504d2c9d5c843 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -108,4 +108,7 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds); DECLARE_mBool(enable_use_cloud_unique_id_from_fe); +DECLARE_mBool(enable_cloud_tablet_report); + +DECLARE_mInt32(cloud_tablet_report_exceed_time_limit); } // namespace doris::config diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index b35a3b9e911416..0b83baa94d6d4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -429,7 +429,7 @@ private Map> warmUpNewClusterByCluster(String dstClusterName, for (Backend backend : backends) { Set beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); List warmUpTablets = new ArrayList<>(); for (Tablet tablet : tablets) { if (beTabletIds.contains(tablet.getId())) { @@ -559,7 +559,7 @@ private Map> warmUpNewClusterByTable(long jobId, String dstCl for (Backend backend : backends) { Set beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); List warmUpTablets = new ArrayList<>(); for (Tablet tablet : tablets) { if (beTabletIds.contains(tablet.getId())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 78947afdb11e39..8e5033470b0e90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -73,6 +73,10 @@ public class CloudTabletRebalancer extends MasterDaemon { private volatile ConcurrentHashMap> beToColocateTabletsGlobal = new ConcurrentHashMap>(); + // used for cloud tablet report + private volatile ConcurrentHashMap> beToTabletsGlobalInSecondary = + new ConcurrentHashMap>(); + private Map> futureBeToTabletsGlobal; private Map> clusterToBes; @@ -164,7 +168,7 @@ private class TransferPairInfo { public boolean srcDecommissioned; } - public Set getSnapshotTabletsByBeId(Long beId) { + public Set getSnapshotTabletsInPrimaryByBeId(Long beId) { Set tabletIds = Sets.newHashSet(); List tablets = beToTabletsGlobal.get(beId); if (tablets != null) { @@ -183,6 +187,24 @@ public Set getSnapshotTabletsByBeId(Long beId) { return tabletIds; } + public Set getSnapshotTabletsInSecondaryByBeId(Long beId) { + Set tabletIds = Sets.newHashSet(); + List tablets = beToTabletsGlobalInSecondary.get(beId); + if (tablets != null) { + for (Tablet tablet : tablets) { + tabletIds.add(tablet.getId()); + } + } + return tabletIds; + } + + public Set getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) { + Set tabletIds = Sets.newHashSet(); + tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId)); + tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId)); + return tabletIds; + } + public int getTabletNumByBackendId(long beId) { List tablets = beToTabletsGlobal.get(beId); List colocateTablets = beToColocateTabletsGlobal.get(beId); @@ -617,6 +639,8 @@ public void fillBeToTablets(long be, long tableId, long partId, long indexId, Ta public void statRouteInfo() { ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpBeToTabletsGlobalInSecondary + = new ConcurrentHashMap>(); ConcurrentHashMap> tmpBeToColocateTabletsGlobal = new ConcurrentHashMap>(); @@ -641,11 +665,8 @@ public void statRouteInfo() { continue; } if (allBes.contains(beId)) { - List colocateTablets = tmpBeToColocateTabletsGlobal.get(beId); - if (colocateTablets == null) { - colocateTablets = new ArrayList(); - tmpBeToColocateTabletsGlobal.put(beId, colocateTablets); - } + List colocateTablets = + tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>()); colocateTablets.add(tablet); } continue; @@ -657,6 +678,14 @@ public void statRouteInfo() { continue; } + Backend secondaryBe = replica.getSecondaryBackend(cluster); + long secondaryBeId = secondaryBe == null ? -1L : secondaryBe.getId(); + if (allBes.contains(secondaryBeId)) { + List tablets = tmpBeToTabletsGlobalInSecondary + .computeIfAbsent(secondaryBeId, k -> new ArrayList<>()); + tablets.add(tablet); + } + InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); InfightTask task = tabletToInfightTask.get(taskKey); long futureBeId = task == null ? beId : task.destBe; @@ -670,6 +699,7 @@ public void statRouteInfo() { }); beToTabletsGlobal = tmpBeToTabletsGlobal; + beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary; beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java new file mode 100644 index 00000000000000..e8473ff1bc8a6f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.master; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.master.ReportHandler; +import org.apache.doris.system.Backend; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.DropReplicaTask; +import org.apache.doris.thrift.TTablet; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class CloudReportHandler extends ReportHandler { + private static final Logger LOG = LogManager.getLogger(CloudReportHandler.class); + + @Override + public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, long backendReportVersion, long numTablets) { + long start = System.currentTimeMillis(); + LOG.info("backend[{}] have {} tablet(s), {} need deal tablet(s). report version: {}", + backendId, numTablets, backendTablets.size(), backendReportVersion); + // current be useful + Set tabletIdsInFe = ((CloudEnv) Env.getCurrentEnv()).getCloudTabletRebalancer() + .getSnapshotTabletsInPrimaryAndSecondaryByBeId(backendId); + + Set tabletIdsInBe = backendTablets.keySet(); + // handle (be - meta) + Set tabletIdsNeedDrop = diffTablets(tabletIdsInFe, tabletIdsInBe); + // drop agent task + deleteFromBackend(backendId, tabletIdsNeedDrop); + + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + LOG.info("finished to handle task report from backend {}-{}, " + + "diff task num: {}, cost: {} ms.", + backendId, be != null ? be.getHost() : "", + tabletIdsNeedDrop.size(), + (System.currentTimeMillis() - start)); + } + + // tabletIdsInFe, tablet is used in Primary or Secondary + // tabletIdsInBe, tablet report exceed time, need to check + // returns tabletIds need to drop + private Set diffTablets(Set tabletIdsInFe, Set tabletIdsInBe) { + // tabletsInBe - tabletsInFe + Set result = new HashSet<>(tabletIdsInBe); + result.removeAll(tabletIdsInFe); + return result; + } + + private static void deleteFromBackend(long backendId, Set tabletIdsWillDrop) { + int deleteFromBackendCounter = 0; + AgentBatchTask batchTask = new AgentBatchTask(); + for (Long tabletId : tabletIdsWillDrop) { + if (LOG.isDebugEnabled()) { + LOG.debug("process tablet [{}], backend[{}]", tabletId, backendId); + } + DropReplicaTask task = new DropReplicaTask(backendId, tabletId, -1, -1, false); + batchTask.addTask(task); + LOG.info("delete tablet[{}] from backend[{}]", tabletId, backendId); + ++deleteFromBackendCounter; + } + + if (batchTask.getTaskNum() != 0) { + AgentTaskExecutor.submit(batchTask); + } + + LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 35b6a230b24a76..a4bbe763f60946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudTablet; +import org.apache.doris.cloud.master.CloudReportHandler; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.load.DeleteJob; @@ -76,7 +77,7 @@ public class MasterImpl { private static final Logger LOG = LogManager.getLogger(MasterImpl.class); - private ReportHandler reportHandler = new ReportHandler(); + private ReportHandler reportHandler = Config.isCloudMode() ? new CloudReportHandler() : new ReportHandler(); public MasterImpl() { reportHandler.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d773cb5850efed..1c8f51bd4ebf24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -162,6 +162,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Map tablets = null; Map partitionsVersion = null; long reportVersion = -1; + long numTablets = 0; ReportType reportType = null; @@ -188,6 +189,12 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Env.getCurrentSystemInfo().updateBackendReportVersion(beId, reportVersion, -1L, -1L, false); } + if (tablets == null) { + numTablets = request.isSetNumTablets() ? request.getNumTablets() : 0; + } else { + numTablets = request.isSetNumTablets() ? request.getNumTablets() : tablets.size(); + } + if (request.isSetPartitionsVersion()) { partitionsVersion = request.getPartitionsVersion(); } @@ -206,7 +213,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, tablets, partitionsVersion, reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), - request.getPipelineExecutorSize()); + request.getPipelineExecutorSize(), numTablets); try { putToQueue(reportTask); } catch (Exception e) { @@ -294,12 +301,13 @@ private class ReportTask extends MasterTask { private List storageResources; private int cpuCores; private int pipelineExecutorSize; + private long numTablets; public ReportTask(long beId, ReportType reportType, Map> tasks, Map disks, Map tablets, Map partitionsVersion, long reportVersion, List storagePolicies, List storageResources, int cpuCores, - int pipelineExecutorSize) { + int pipelineExecutorSize, long numTablets) { this.beId = beId; this.reportType = reportType; this.tasks = tasks; @@ -311,6 +319,7 @@ public ReportTask(long beId, ReportType reportType, Map> ta this.storageResources = storageResources; this.cpuCores = cpuCores; this.pipelineExecutorSize = pipelineExecutorSize; + this.numTablets = numTablets; } @Override @@ -336,7 +345,7 @@ protected void exec() { if (partitions == null) { partitions = Maps.newHashMap(); } - ReportHandler.tabletReport(beId, tablets, partitions, reportVersion); + tabletReport(beId, tablets, partitions, reportVersion, numTablets); } } } @@ -471,8 +480,8 @@ private static void diffResource(List storageResourcesInBe, Li } // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, - Map backendPartitionsVersion, long backendReportVersion) { + public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, long backendReportVersion, long numTablets) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 08ad231168b9f2..82140c5b6d7fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2577,7 +2577,7 @@ public static void syncLoadForTablets(List> backendsList, List tabletIdList = new ArrayList(); Set beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); allTabletIds.forEach(tabletId -> { if (beTabletIds.contains(tabletId)) { tabletIdList.add(tabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3fc09b31f2d312..0feca773571a6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -253,6 +253,7 @@ public HeartbeatResponse call() { if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); + copiedMasterInfo.setCloudTabletReportExceedTimeLimit(Config.rehash_tablet_after_be_dead_seconds); } THeartbeatResult result; if (!FeConstants.runningUnitTest) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 1ac497dbebe28d..423c839faa2775 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -105,8 +105,8 @@ public void testRepairLastFailedVersionByReport() throws Exception { Map tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + ReportHandler reportHandler = new ReportHandler(); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); @@ -135,12 +135,12 @@ public void testVersionRegressive() throws Exception { tTablet.addToTabletInfos(tTabletInfo); Map tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + ReportHandler reportHandler = new ReportHandler(); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index c03f04a6543f22..56cbece82ab68f 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -41,6 +41,8 @@ struct TMasterInfo { 9: optional list frontend_infos 10: optional string meta_service_endpoint; 11: optional string cloud_unique_id; + // See configuration item Config.java rehash_tablet_after_be_dead_seconds for meaning + 12: optional i64 cloud_tablet_report_exceed_time_limit; } struct TBackendInfo { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index ecedf0ee1afad5..9d8cd9111ba5c1 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -114,6 +114,8 @@ struct TReportRequest { 11: i32 num_cores 12: i32 pipeline_executor_size 13: optional map partitions_version + // tablet num in be, in cloud num_tablets may not eq tablet_list.size() + 14: optional i64 num_tablets } struct TMasterResult { From 5c4d8d7061fe68a3d7ff219dd0fe01705d87a790 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Mon, 21 Oct 2024 20:30:34 +0800 Subject: [PATCH 2/5] add get_tablet_api and regression case --- be/src/cloud/cloud_tablet_mgr.cpp | 15 +++++++++++++++ be/src/cloud/cloud_tablet_mgr.h | 2 ++ be/src/http/action/tablets_info_action.cpp | 20 +++++++++++--------- be/src/olap/base_tablet.h | 3 +++ be/src/olap/tablet.cpp | 4 ---- be/src/olap/tablet.h | 3 --- be/src/olap/tablet_manager.cpp | 1 + be/src/olap/txn_manager.cpp | 1 + 8 files changed, 33 insertions(+), 16 deletions(-) diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 42744913c3d268..f471732f831e31 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -403,4 +403,19 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* << " exceed drop time limit count=" << tablets_info->size(); } +void CloudTabletMgr::obtain_specific_quantity_tablets(std::vector& tablets_info, + int64_t num) { + auto weak_tablets = get_weak_tablets(); + for (auto& weak_tablet : weak_tablets) { + auto t = weak_tablet.lock(); + if (t == nullptr) { + continue; + } + if (tablets_info.size() >= num) { + return; + } + tablets_info.push_back(t->get_tablet_info()); + } +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 2a588602bdc7aa..30351b893f402d 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -71,6 +71,8 @@ class CloudTabletMgr { void build_all_report_tablets_info(std::map* tablets_info, uint64_t* tablet_num); + void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); + private: CloudStorageEngine& _engine; diff --git a/be/src/http/action/tablets_info_action.cpp b/be/src/http/action/tablets_info_action.cpp index 9c27c1de9a02b3..e3d08edddffc63 100644 --- a/be/src/http/action/tablets_info_action.cpp +++ b/be/src/http/action/tablets_info_action.cpp @@ -24,6 +24,8 @@ #include #include +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" #include "cloud/config.h" #include "http/http_channel.h" #include "http/http_headers.h" @@ -51,12 +53,6 @@ void TabletsInfoAction::handle(HttpRequest* req) { EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) { EasyJson tablets_info_ej; - if (config::is_cloud_mode()) { - // TODO(plat1ko): CloudStorageEngine - tablets_info_ej["msg"] = "TabletsInfoAction::get_tablets_info is not implemented"; - tablets_info_ej["code"] = 0; - return tablets_info_ej; - } int64_t number; std::string msg; @@ -74,9 +70,15 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) { msg = "Parameter Error"; } std::vector tablets_info; - TabletManager* tablet_manager = - ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager(); - tablet_manager->obtain_specific_quantity_tablets(tablets_info, number); + if (!config::is_cloud_mode()) { + TabletManager* tablet_manager = + ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager(); + tablet_manager->obtain_specific_quantity_tablets(tablets_info, number); + } else { + CloudTabletMgr& cloud_tablet_manager = + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr(); + cloud_tablet_manager.obtain_specific_quantity_tablets(tablets_info, number); + } tablets_info_ej["msg"] = msg; tablets_info_ej["code"] = 0; diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4aaca77770db0f..2df368baf480ab 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -292,6 +292,9 @@ class BaseTablet { Status show_nested_index_file(std::string* json_meta); + TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); } + TabletInfo get_tablet_info() const { return TabletInfo(tablet_id(), tablet_uid()); } + protected: // Find the missed versions until the spec_version. // diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9dfb7940dcc916..4c52dc2dd8eb31 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1201,10 +1201,6 @@ Status Tablet::_contains_version(const Version& version) { return Status::OK(); } -TabletInfo Tablet::get_tablet_info() const { - return TabletInfo(tablet_id(), tablet_uid()); -} - std::vector Tablet::pick_candidate_rowsets_to_cumulative_compaction() { std::vector candidate_rowsets; if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 2b4daa5a4c35ac..35d8162c2c1310 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -115,7 +115,6 @@ class Tablet final : public BaseTablet { DataDir* data_dir() const { return _data_dir; } int64_t replica_id() const { return _tablet_meta->replica_id(); } - TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); } const std::string& tablet_path() const { return _tablet_path; } @@ -279,8 +278,6 @@ class Tablet final : public BaseTablet { void check_tablet_path_exists(); - TabletInfo get_tablet_info() const; - std::vector pick_candidate_rowsets_to_cumulative_compaction(); std::vector pick_candidate_rowsets_to_base_compaction(); std::vector pick_candidate_rowsets_to_full_compaction(); diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 64eb408c9e3dbd..af884d6a38ecd5 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -41,6 +41,7 @@ #include "gutil/strings/strcat.h" #include "gutil/strings/substitute.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index d227f53053128b..9892a60cb585f6 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -34,6 +34,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/delta_writer.h" #include "olap/olap_common.h" From eb8a2a776ce3c4f93b683760e4770c6a939a2e9a Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Tue, 22 Oct 2024 22:57:47 +0800 Subject: [PATCH 3/5] add regression case --- be/src/agent/task_worker_pool.cpp | 5 + be/src/cloud/cloud_tablet_mgr.cpp | 11 +- ..._clean_tablet_when_drop_force_table.groovy | 140 ++++++++++++++++ .../test_clean_tablet_when_rebalance.groovy | 158 ++++++++++++++++++ 4 files changed, 310 insertions(+), 4 deletions(-) create mode 100644 regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy create mode 100644 regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ff8eee2e5bd300..90e224d7ceaf26 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1658,6 +1658,11 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { const auto& drop_tablet_req = req.drop_tablet_req; + DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", { + LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + .tag("tablet_id", drop_tablet_req.tablet_id); + return; + }); // 1. erase lru from tablet mgr // TODO(dx) clean tablet file cache // get tablet's info(such as cachekey, tablet id, rsid) diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index f471732f831e31..33dafb2dfd0312 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -183,7 +183,7 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CachePriority::NORMAL); auto ret = std::shared_ptr(tablet.get(), [this, handle](CloudTablet* tablet) { - int64_t now = duration_cast( + int64_t now = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); tablet->last_cache_release_ms = now; @@ -202,7 +202,7 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CloudTablet* tablet_raw_ptr = reinterpret_cast(_cache->value(handle))->tablet.get(); auto tablet = std::shared_ptr(tablet_raw_ptr, [this, handle](CloudTablet* tablet) { - int64_t now = duration_cast( + int64_t now = duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); tablet->last_cache_release_ms = now; @@ -381,8 +381,11 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* (*tablet_num)++; TTabletInfo tablet_info; tablet->build_tablet_report_info(&tablet_info); - if (::time(nullptr) - config::cloud_tablet_report_exceed_time_limit < - tablet->last_cache_release_ms / 1000) { + int64_t now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (now - config::cloud_tablet_report_exceed_time_limit * 1000 < + tablet->last_cache_release_ms) { // the tablet is still being accessed and used in recently, so not report it return; } diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy new file mode 100644 index 00000000000000..4dc847d603a324 --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_tablet_when_drop_force_table', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=5' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1' + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromFe = { table -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : host + def ret = [:] + result.each { + ret[it.TabletId] = bes[it.BackendId] + } + ret + } + + def getTabletAndBeHostFromBe = { -> + def bes = cluster.getAllBackends() + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def testCase = { table, waitTime, useDp=false-> + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (1, 1), (2, 2), (3, 3) + """ + + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + } + + // before drop table force + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + if (useDp) { + GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + } + // after drop table force + + sql """ + DROP TABLE $table FORCE + """ + def futrue + if (useDp) { + futrue = thread { + sleep(10 * 1000) + GetDebugPoint().disableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + } + } + def start = System.currentTimeMillis() / 1000 + // tablet can't find in be + dockerAwaitUntil(50) { + def beTablets = getTabletAndBeHostFromBe.call().keySet() + logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) + beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } + } + logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 1000 - start) + assertTrue(System.currentTimeMillis() / 1000 - start > waitTime) + if (useDp) { + futrue.get() + } + } + + docker(options) { + // because rehash_tablet_after_be_dead_seconds=5 + testCase("test_clean_tablet_when_drop_force_table_1", 5) + // report retry + testCase("test_clean_tablet_when_drop_force_table_2", 10, true) + } +} diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy new file mode 100644 index 00000000000000..acd1c5a89b78a5 --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_tablet_when_rebalance', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + def rehashTime = 100 + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime") + options.beConfigs += [ + 'report_tablet_interval_seconds=1' + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + + def choseDeadBeIndex = 1 + def table = "test_clean_tablet_when_rebalance" + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromFe = { -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : host + def ret = [:] + result.each { + ret[it.TabletId] = bes[it.BackendId] + } + ret + } + + def getTabletAndBeHostFromBe = { -> + def bes = cluster.getAllBackends() + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def testCase = { deadTime -> + boolean beDeadLong = deadTime > rehashTime ? true : false + logger.info("begin exec beDeadLong {}", beDeadLong) + + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + } + + def beforeGetFromFe = getTabletAndBeHostFromFe() + def beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + + cluster.stopBackends(choseDeadBeIndex) + dockerAwaitUntil(10) { + def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + .collect { it.BackendId } + .unique() + logger.info("bes {}", bes) + bes.size() == 2 + } + + if (beDeadLong) { + setFeConfig('enable_cloud_partition_balance', false) + setFeConfig('enable_cloud_table_balance', false) + setFeConfig('enable_cloud_global_balance', false) + } + sleep(deadTime * 1000) + + cluster.startBackends(choseDeadBeIndex) + + dockerAwaitUntil(10) { + def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + .collect { it.BackendId } + .unique() + logger.info("bes {}", bes) + bes.size() == (beDeadLong ? 2 : 3) + } + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + sleep(1000) + } + beforeGetFromFe = getTabletAndBeHostFromFe() + beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + } + + docker(options) { + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (1, 1), (2, 2), (3, 3) + """ + // 'rehash_tablet_after_be_dead_seconds=10' + // be-1 dead, but not dead for a long time + testCase(5) + // be-1 dead, and dead for a long time + testCase(200) + } +} From 3c569bf53dea9bdfcd74388582a1d4d846b2473f Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Wed, 23 Oct 2024 17:46:50 +0800 Subject: [PATCH 4/5] fix review --- be/src/cloud/config.h | 2 +- be/src/service/http_service.cpp | 2 +- .../cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index f504d2c9d5c843..c96dddc277dbe1 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -108,7 +108,7 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds); DECLARE_mBool(enable_use_cloud_unique_id_from_fe); -DECLARE_mBool(enable_cloud_tablet_report); +DECLARE_Bool(enable_cloud_tablet_report); DECLARE_mInt32(cloud_tablet_report_exceed_time_limit); } // namespace doris::config diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 9330867ded65a1..31fa4f8a8b1b1c 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -374,7 +374,7 @@ void HttpService::register_local_handler(StorageEngine& engine) { _ev_http_server->register_handler(HttpMethod::POST, "/api/pad_rowset", pad_rowset_action); ReportAction* report_tablet_action = _pool.add(new ReportAction( - _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_OLAP_TABLE")); + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_OLAP_TABLET")); _ev_http_server->register_handler(HttpMethod::GET, "/api/report/tablet", report_tablet_action); ReportAction* report_disk_action = _pool.add(new ReportAction( diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index acd1c5a89b78a5..4a44b317cc2233 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -95,7 +95,7 @@ suite('test_clean_tablet_when_rebalance', 'docker') { } cluster.stopBackends(choseDeadBeIndex) - dockerAwaitUntil(10) { + dockerAwaitUntil(50) { def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") .collect { it.BackendId } .unique() @@ -112,7 +112,7 @@ suite('test_clean_tablet_when_rebalance', 'docker') { cluster.startBackends(choseDeadBeIndex) - dockerAwaitUntil(10) { + dockerAwaitUntil(50) { def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") .collect { it.BackendId } .unique() From f49234c345a523e9a168a91919fa4a865b8f7942 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Sun, 27 Oct 2024 12:33:59 +0800 Subject: [PATCH 5/5] fix review two --- be/src/agent/heartbeat_server.cpp | 12 ++---- be/src/agent/task_worker_pool.cpp | 6 +-- be/src/cloud/cloud_tablet.h | 2 +- be/src/cloud/cloud_tablet_mgr.cpp | 38 +++++++++---------- be/src/cloud/cloud_tablet_mgr.h | 10 ++++- be/src/cloud/config.cpp | 2 - be/src/cloud/config.h | 1 - be/src/http/action/tablets_info_action.cpp | 2 +- be/src/olap/tablet_manager.cpp | 1 - be/src/olap/txn_manager.cpp | 1 - .../cloud/master/CloudReportHandler.java | 3 -- .../org/apache/doris/system/HeartbeatMgr.java | 2 +- gensrc/thrift/HeartbeatService.thrift | 2 +- 13 files changed, 39 insertions(+), 43 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index b4188e4fb3840e..78002ed08fe0df 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -26,6 +26,7 @@ #include #include +#include "cloud/cloud_tablet_mgr.h" #include "cloud/config.h" #include "common/config.h" #include "common/status.h" @@ -275,14 +276,9 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st; } - if (master_info.__isset.cloud_tablet_report_exceed_time_limit && - config::cloud_tablet_report_exceed_time_limit <= 0) { - // be not set, use fe heartbeat, default be not set it - auto st = config::set_config( - "cloud_tablet_report_exceed_time_limit", - std::to_string(master_info.cloud_tablet_report_exceed_time_limit), true); - LOG(INFO) << "set config cloud_tablet_report_exceed_time_limit " - << master_info.cloud_tablet_report_exceed_time_limit << " " << st; + if (master_info.__isset.tablet_report_inactive_duration_ms) { + doris::g_tablet_report_inactive_duration_ms = + master_info.tablet_report_inactive_duration_ms; } if (need_report) { diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 90e224d7ceaf26..d9efe6dbedde24 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1154,11 +1154,11 @@ void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& maste increase_report_version(); uint64_t report_version; - uint64_t tablet_num = 0; + uint64_t total_num_tablets = 0; for (int i = 0; i < 5; i++) { request.tablets.clear(); report_version = s_report_version; - engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &tablet_num); + engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets); if (report_version == s_report_version) { break; } @@ -1171,7 +1171,7 @@ void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& maste } request.__set_report_version(report_version); - request.__set_num_tablets(tablet_num); + request.__set_num_tablets(total_num_tablets); bool succ = handle_report(request, master_info, "tablet"); report_tablet_total << 1; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index ef89afefd5a115..465447728aee93 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -196,7 +196,7 @@ class CloudTablet final : public BaseTablet { int64_t last_base_compaction_success_time_ms = 0; int64_t last_cumu_compaction_success_time_ms = 0; int64_t last_cumu_no_suitable_version_ms = 0; - int64_t last_cache_release_ms = 0; + int64_t last_access_time_ms = 0; // Return merged extended schema TabletSchemaSPtr merged_tablet_schema() const override; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 33dafb2dfd0312..7ecb72e62fd5de 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -28,6 +28,7 @@ #include "runtime/memory/cache_policy.h" namespace doris { +uint64_t g_tablet_report_inactive_duration_ms = 0; namespace { // port from @@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) CloudTabletMgr::~CloudTabletMgr() = default; +void set_tablet_access_time_ms(CloudTablet* tablet) { + using namespace std::chrono; + int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); + tablet->last_access_time_ms = now; +} + Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` @@ -183,10 +190,7 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i CachePriority::NORMAL); auto ret = std::shared_ptr(tablet.get(), [this, handle](CloudTablet* tablet) { - int64_t now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - tablet->last_cache_release_ms = now; + set_tablet_access_time_ms(tablet); _cache->release(handle); }); _tablet_map->put(std::move(tablet)); @@ -197,15 +201,14 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i if (tablet == nullptr) { return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); } + set_tablet_access_time_ms(tablet.get()); return tablet; } CloudTablet* tablet_raw_ptr = reinterpret_cast(_cache->value(handle))->tablet.get(); + set_tablet_access_time_ms(tablet_raw_ptr); auto tablet = std::shared_ptr(tablet_raw_ptr, [this, handle](CloudTablet* tablet) { - int64_t now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - tablet->last_cache_release_ms = now; + set_tablet_access_time_ms(tablet); _cache->release(handle); }); return tablet; @@ -381,11 +384,9 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* (*tablet_num)++; TTabletInfo tablet_info; tablet->build_tablet_report_info(&tablet_info); - int64_t now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - if (now - config::cloud_tablet_report_exceed_time_limit * 1000 < - tablet->last_cache_release_ms) { + using namespace std::chrono; + int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); + if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) { // the tablet is still being accessed and used in recently, so not report it return; } @@ -406,18 +407,17 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* << " exceed drop time limit count=" << tablets_info->size(); } -void CloudTabletMgr::obtain_specific_quantity_tablets(std::vector& tablets_info, - int64_t num) { +void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector* tablets_info) { auto weak_tablets = get_weak_tablets(); for (auto& weak_tablet : weak_tablets) { - auto t = weak_tablet.lock(); - if (t == nullptr) { + auto tablet = weak_tablet.lock(); + if (tablet == nullptr) { continue; } - if (tablets_info.size() >= num) { + if (tablets_info->size() >= num_tablets) { return; } - tablets_info.push_back(t->get_tablet_info()); + tablets_info->push_back(tablet->get_tablet_info()); } } diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 30351b893f402d..903f372cbdec5f 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -34,6 +34,8 @@ class CloudStorageEngine; class LRUCachePolicy; class CountDownLatch; +extern uint64_t g_tablet_report_inactive_duration_ms; + class CloudTabletMgr { public: CloudTabletMgr(CloudStorageEngine& engine); @@ -68,10 +70,16 @@ class CloudTabletMgr { std::vector>* tablets, int64_t* max_score); + /** + * Gets tablets info and total tablet num that are reported + * + * @param tablets_info used by report + * @param tablet_num tablets in be tabletMgr, total num + */ void build_all_report_tablets_info(std::map* tablets_info, uint64_t* tablet_num); - void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); + void get_tablet_info(int64_t num_tablets, std::vector* tablets_info); private: CloudStorageEngine& _engine; diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 19c1add821c8ac..32e3250f87c258 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -76,6 +76,4 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true"); DEFINE_mBool(enable_cloud_tablet_report, "true"); - -DEFINE_mInt32(cloud_tablet_report_exceed_time_limit, "0"); } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index c96dddc277dbe1..8af967afb8c67b 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -110,5 +110,4 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe); DECLARE_Bool(enable_cloud_tablet_report); -DECLARE_mInt32(cloud_tablet_report_exceed_time_limit); } // namespace doris::config diff --git a/be/src/http/action/tablets_info_action.cpp b/be/src/http/action/tablets_info_action.cpp index e3d08edddffc63..672b03ce6ceaed 100644 --- a/be/src/http/action/tablets_info_action.cpp +++ b/be/src/http/action/tablets_info_action.cpp @@ -77,7 +77,7 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) { } else { CloudTabletMgr& cloud_tablet_manager = ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr(); - cloud_tablet_manager.obtain_specific_quantity_tablets(tablets_info, number); + cloud_tablet_manager.get_tablet_info(number, &tablets_info); } tablets_info_ej["msg"] = msg; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index af884d6a38ecd5..64eb408c9e3dbd 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -41,7 +41,6 @@ #include "gutil/strings/strcat.h" #include "gutil/strings/substitute.h" #include "io/fs/local_file_system.h" -#include "olap/base_tablet.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 9892a60cb585f6..d227f53053128b 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -34,7 +34,6 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" -#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/delta_writer.h" #include "olap/olap_common.h" diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java index e8473ff1bc8a6f..6564bd7d3a51a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java @@ -74,9 +74,6 @@ private static void deleteFromBackend(long backendId, Set tabletIdsWillDro int deleteFromBackendCounter = 0; AgentBatchTask batchTask = new AgentBatchTask(); for (Long tabletId : tabletIdsWillDrop) { - if (LOG.isDebugEnabled()) { - LOG.debug("process tablet [{}], backend[{}]", tabletId, backendId); - } DropReplicaTask task = new DropReplicaTask(backendId, tabletId, -1, -1, false); batchTask.addTask(task); LOG.info("delete tablet[{}] from backend[{}]", tabletId, backendId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 0feca773571a6b..7fe5bf0d44296b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -253,7 +253,7 @@ public HeartbeatResponse call() { if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); - copiedMasterInfo.setCloudTabletReportExceedTimeLimit(Config.rehash_tablet_after_be_dead_seconds); + copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds); } THeartbeatResult result; if (!FeConstants.runningUnitTest) { diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 56cbece82ab68f..acdc608f21b773 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -42,7 +42,7 @@ struct TMasterInfo { 10: optional string meta_service_endpoint; 11: optional string cloud_unique_id; // See configuration item Config.java rehash_tablet_after_be_dead_seconds for meaning - 12: optional i64 cloud_tablet_report_exceed_time_limit; + 12: optional i64 tablet_report_inactive_duration_ms; } struct TBackendInfo {