diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index cbe6ab8ae24112..d6eb54e5c4190f 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -29,6 +29,7 @@ #include #include +#include "cloud/cloud_tablet.h" #include "cloud/config.h" #include "cloud/pb_convert.h" #include "common/logging.h" @@ -38,7 +39,6 @@ #include "gen_cpp/olap_file.pb.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_factory.h" -#include "olap/tablet.h" #include "olap/tablet_meta.h" #include "runtime/stream_load/stream_load_context.h" #include "util/network_util.h" @@ -270,12 +270,12 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab return Status::OK(); } -Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data) { +Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data) { return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not implemented"); } Status CloudMetaMgr::sync_tablet_delete_bitmap( - Tablet* tablet, int64_t old_max_version, + CloudTablet* tablet, int64_t old_max_version, const google::protobuf::RepeatedPtrField& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap) { return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented"); @@ -425,15 +425,15 @@ Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema& return Status::OK(); } -Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator, - DeleteBitmap* delete_bitmap) { - VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id(); +Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator, DeleteBitmap* delete_bitmap) { + VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id(); UpdateDeleteBitmapRequest req; UpdateDeleteBitmapResponse res; req.set_cloud_unique_id(config::cloud_unique_id); - req.set_table_id(tablet->table_id()); - req.set_partition_id(tablet->partition_id()); - req.set_tablet_id(tablet->tablet_id()); + req.set_table_id(tablet.table_id()); + req.set_partition_id(tablet.partition_id()); + req.set_tablet_id(tablet.tablet_id()); req.set_lock_id(lock_id); req.set_initiator(initiator); for (auto iter = delete_bitmap->delete_bitmap.begin(); @@ -451,18 +451,18 @@ Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t lock_id, if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) { return Status::Error( "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}", - tablet->tablet_id(), lock_id); + tablet.tablet_id(), lock_id); } return st; } -Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id, +Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator) { - VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet->tablet_id(); + VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id(); GetDeleteBitmapUpdateLockRequest req; GetDeleteBitmapUpdateLockResponse res; req.set_cloud_unique_id(config::cloud_unique_id); - req.set_table_id(tablet->table_id()); + req.set_table_id(tablet.table_id()); req.set_lock_id(lock_id); req.set_initiator(initiator); req.set_expiration(10); // 10s expiration time for compaction and schema_change diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 6557a6eab8a1db..af5b048b2f06c0 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -29,7 +29,7 @@ namespace doris { class DeleteBitmap; class StreamLoadContext; -class Tablet; +class CloudTablet; class TabletMeta; class TabletSchema; class RowsetMeta; @@ -51,7 +51,7 @@ class CloudMetaMgr { Status get_tablet_meta(int64_t tablet_id, std::shared_ptr* tablet_meta); - Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false); + Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false); Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, std::shared_ptr* existed_rs_meta = nullptr); @@ -79,14 +79,15 @@ class CloudMetaMgr { Status update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema); - Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator, + Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, int64_t initiator, DeleteBitmap* delete_bitmap); - Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id, int64_t initiator); + Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator); private: Status sync_tablet_delete_bitmap( - Tablet* tablet, int64_t old_max_version, + CloudTablet* tablet, int64_t old_max_version, const google::protobuf::RepeatedPtrField& rs_metas, const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); }; diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h new file mode 100644 index 00000000000000..87e3ed52d39041 --- /dev/null +++ b/be/src/cloud/cloud_storage_engine.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace doris { +namespace cloud { +class CloudMetaMgr; +} + +class CloudStorageEngine { +public: + CloudStorageEngine(); + + ~CloudStorageEngine(); + + cloud::CloudMetaMgr& meta_mgr() { return *_meta_mgr; } + +private: + std::unique_ptr _meta_mgr; +}; + +} // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp new file mode 100644 index 00000000000000..03670df78d10d3 --- /dev/null +++ b/be/src/cloud/cloud_tablet.cpp @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet.h" + +#include +#include +#include +#include +#include + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "io/cache/block/block_file_cache_factory.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" + +namespace doris { +using namespace ErrorCode; + +CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) + : BaseTablet(std::move(tablet_meta)), _engine(engine) {} + +CloudTablet::~CloudTablet() = default; + +bool CloudTablet::exceed_version_limit(int32_t limit) { + return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit; +} + +Status CloudTablet::capture_rs_readers(const Version& spec_version, + std::vector* rs_splits, + bool skip_missing_version) { + Versions version_path; + std::shared_lock rlock(_meta_lock); + auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); + if (!st.ok()) { + rlock.unlock(); // avoid logging in lock range + // Check no missed versions or req version is merged + auto missed_versions = calc_missed_versions(spec_version.second); + if (missed_versions.empty()) { + st.set_code(VERSION_ALREADY_MERGED); // Reset error code + } + st.append(" tablet_id=" + std::to_string(tablet_id())); + // clang-format off + LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }(); + // clang-format on + return st; + } + VLOG_DEBUG << "capture consitent versions: " << version_path; + return capture_rs_readers_unlocked(version_path, rs_splits); +} + +// for example: +// [0-4][5-5][8-8][9-9][13-13] +// if spec_version = 12, it will return [6-7],[10-12] +Versions CloudTablet::calc_missed_versions(int64_t spec_version) { + DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; + + Versions missed_versions; + Versions existing_versions; + { + std::shared_lock rdlock(_meta_lock); + for (const auto& rs : _tablet_meta->all_rs_metas()) { + existing_versions.emplace_back(rs->version()); + } + } + + // sort the existing versions in ascending order + std::sort(existing_versions.begin(), existing_versions.end(), + [](const Version& a, const Version& b) { + // simple because 2 versions are certainly not overlapping + return a.first < b.first; + }); + + auto min_version = existing_versions.front().first; + if (min_version > 0) { + missed_versions.emplace_back(0, std::min(spec_version, min_version - 1)); + } + for (auto it = existing_versions.begin(); it != existing_versions.end() - 1; ++it) { + auto prev_v = it->second; + if (prev_v >= spec_version) { + return missed_versions; + } + auto next_v = (it + 1)->first; + if (next_v > prev_v + 1) { + // there is a hole between versions + missed_versions.emplace_back(prev_v + 1, std::min(spec_version, next_v - 1)); + } + } + auto max_version = existing_versions.back().second; + if (max_version < spec_version) { + missed_versions.emplace_back(max_version + 1, spec_version); + } + return missed_versions; +} + +Status CloudTablet::sync_meta() { + // TODO(lightman): FileCache + return Status::NotSupported("CloudTablet::sync_meta is not implemented"); +} + +// There are only two tablet_states RUNNING and NOT_READY in cloud mode +// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. +Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { + RETURN_IF_ERROR(sync_if_not_running()); + + if (query_version > 0) { + std::shared_lock rlock(_meta_lock); + if (_max_version >= query_version) { + return Status::OK(); + } + } + + // serially execute sync to reduce unnecessary network overhead + std::lock_guard lock(_sync_meta_lock); + if (query_version > 0) { + std::shared_lock rlock(_meta_lock); + if (_max_version >= query_version) { + return Status::OK(); + } + } + + auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data); + if (st.is()) { + recycle_cached_data(); + } + return st; +} + +// Sync tablet meta and all rowset meta if not running. +// This could happen when BE didn't finish schema change job and another BE committed this schema change job. +// It should be a quite rare situation. +Status CloudTablet::sync_if_not_running() { + if (tablet_state() == TABLET_RUNNING) { + return Status::OK(); + } + + // Serially execute sync to reduce unnecessary network overhead + std::lock_guard lock(_sync_meta_lock); + + { + std::shared_lock rlock(_meta_lock); + if (tablet_state() == TABLET_RUNNING) { + return Status::OK(); + } + } + + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); + if (!st.ok()) { + if (st.is()) { + recycle_cached_data(); + } + return st; + } + + if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] { + // MoW may go to here when load while schema change + return Status::Error("invalid tablet state {}. tablet_id={}", + tablet_meta->tablet_state(), tablet_id()); + } + + TimestampedVersionTracker empty_tracker; + { + std::lock_guard wlock(_meta_lock); + RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING)); + _rs_version_map.clear(); + _stale_rs_version_map.clear(); + std::swap(_timestamped_version_tracker, empty_tracker); + _tablet_meta->clear_rowsets(); + _tablet_meta->clear_stale_rowset(); + _max_version = -1; + } + + st = _engine.meta_mgr().sync_tablet_rowsets(this); + if (st.is()) { + recycle_cached_data(); + } + return st; +} + +void CloudTablet::add_rowsets(std::vector to_add, bool version_overlap, + std::unique_lock& meta_lock, + bool warmup_delta_data) { + if (to_add.empty()) { + return; + } + + auto add_rowsets_directly = [=, this](std::vector& rowsets) { + for (auto& rs : rowsets) { + _rs_version_map.emplace(rs->version(), rs); + _timestamped_version_tracker.add_version(rs->version()); + _max_version = std::max(rs->end_version(), _max_version); + update_base_size(*rs); + } + _tablet_meta->add_rowsets_unchecked(rowsets); + // TODO(plat1ko): Warmup delta rowset data in background + }; + + if (!version_overlap) { + add_rowsets_directly(to_add); + return; + } + + // Filter out existed rowsets + auto remove_it = + std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) { + if (auto find_it = _rs_version_map.find(rs->version()); + find_it == _rs_version_map.end()) { + return false; + } else if (find_it->second->rowset_id() == rs->rowset_id()) { + return true; // Same rowset + } + + // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal, + // replace existed rowset with `to_add` rowset. This may occur when: + // 1. schema change converts rowsets which have been double written to new tablet + // 2. cumu compaction picks single overlapping input rowset to perform compaction + _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr); + _rs_version_map[rs->version()] = rs; + _tablet_meta->add_rowsets_unchecked({rs}); + update_base_size(*rs); + return true; + }); + + to_add.erase(remove_it, to_add.end()); + + // delete rowsets with overlapped version + std::vector to_add_directly; + for (auto& to_add_rs : to_add) { + // delete rowsets with overlapped version + std::vector to_delete; + Version to_add_v = to_add_rs->version(); + // if start_version > max_version, we can skip checking overlap here. + if (to_add_v.first > _max_version) { + // if start_version > max_version, we can skip checking overlap here. + to_add_directly.push_back(to_add_rs); + } else { + to_add_directly.push_back(to_add_rs); + for (auto& [v, rs] : _rs_version_map) { + if (to_add_v.contains(v)) { + to_delete.push_back(rs); + } + } + delete_rowsets(to_delete, meta_lock); + } + } + + add_rowsets_directly(to_add_directly); +} + +void CloudTablet::delete_rowsets(const std::vector& to_delete, + std::unique_lock&) { + if (to_delete.empty()) { + return; + } + std::vector rs_metas; + rs_metas.reserve(to_delete.size()); + for (auto&& rs : to_delete) { + rs_metas.push_back(rs->rowset_meta()); + _stale_rs_version_map[rs->version()] = rs; + } + _timestamped_version_tracker.add_stale_path_version(rs_metas); + for (auto&& rs : to_delete) { + _rs_version_map.erase(rs->version()); + } + + _tablet_meta->modify_rs_metas({}, rs_metas, false); +} + +int CloudTablet::delete_expired_stale_rowsets() { + std::vector expired_rowsets; + int64_t expired_stale_sweep_endtime = + ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; + { + std::unique_lock wlock(_meta_lock); + + std::vector path_ids; + // capture the path version to delete + _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids); + + if (path_ids.empty()) { + return 0; + } + + for (int64_t path_id : path_ids) { + // delete stale versions in version graph + auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id); + for (auto& v_ts : version_path->timestamped_versions()) { + auto rs_it = _stale_rs_version_map.find(v_ts->version()); + if (rs_it != _stale_rs_version_map.end()) { + expired_rowsets.push_back(rs_it->second); + _stale_rs_version_map.erase(rs_it); + } else { + LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet " + << tablet_id(); + // clang-format off + DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }(); + // clang-format on + } + _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version()); + VLOG_DEBUG << "delete stale rowset " << v_ts->version(); + } + } + _reconstruct_version_tracker_if_necessary(); + } + recycle_cached_data(expired_rowsets); + return expired_rowsets.size(); +} + +void CloudTablet::update_base_size(const Rowset& rs) { + // Define base rowset as the rowset of version [2-x] + if (rs.start_version() == 2) { + _base_size = rs.data_disk_size(); + } +} + +void CloudTablet::recycle_cached_data() { + // TODO(plat1ko) +} + +void CloudTablet::recycle_cached_data(const std::vector& rowsets) { + // TODO(plat1ko) +} + +void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments, + int64_t num_rows, int64_t data_size) { + _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed); + _approximate_num_segments.store(num_segments, std::memory_order_relaxed); + _approximate_num_rows.store(num_rows, std::memory_order_relaxed); + _approximate_data_size.store(data_size, std::memory_order_relaxed); + int64_t cumu_num_deltas = 0; + int64_t cumu_num_rowsets = 0; + auto cp = _cumulative_point.load(std::memory_order_relaxed); + for (auto& [v, r] : _rs_version_map) { + if (v.second < cp) { + continue; + } + + cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1; + ++cumu_num_rowsets; + } + _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed); + _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed); +} + +Result> CloudTablet::create_rowset_writer( + RowsetWriterContext& context, bool vertical) { + return ResultError( + Status::NotSupported("CloudTablet::create_rowset_writer is not implemented")); +} + +// return a json string to show the compaction status of this tablet +void CloudTablet::get_compaction_status(std::string* json_result) { + rapidjson::Document root; + root.SetObject(); + + rapidjson::Document path_arr; + path_arr.SetArray(); + + std::vector rowsets; + std::vector stale_rowsets; + { + std::shared_lock rdlock(_meta_lock); + rowsets.reserve(_rs_version_map.size()); + for (auto& it : _rs_version_map) { + rowsets.push_back(it.second); + } + stale_rowsets.reserve(_stale_rs_version_map.size()); + for (auto& it : _stale_rs_version_map) { + stale_rowsets.push_back(it.second); + } + } + std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); + std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator); + + // get snapshot version path json_doc + _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); + root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); + + // print all rowsets' version as an array + rapidjson::Document versions_arr; + rapidjson::Document missing_versions_arr; + versions_arr.SetArray(); + missing_versions_arr.SetArray(); + int64_t last_version = -1; + for (auto& rowset : rowsets) { + const Version& ver = rowset->version(); + if (ver.first != last_version + 1) { + rapidjson::Value miss_value; + miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(), + missing_versions_arr.GetAllocator()); + missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator()); + } + rapidjson::Value value; + std::string version_str = rowset->get_rowset_info_str(); + value.SetString(version_str.c_str(), version_str.length(), versions_arr.GetAllocator()); + versions_arr.PushBack(value, versions_arr.GetAllocator()); + last_version = ver.second; + } + root.AddMember("rowsets", versions_arr, root.GetAllocator()); + root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator()); + + // print all stale rowsets' version as an array + rapidjson::Document stale_versions_arr; + stale_versions_arr.SetArray(); + for (auto& rowset : stale_rowsets) { + rapidjson::Value value; + std::string version_str = rowset->get_rowset_info_str(); + value.SetString(version_str.c_str(), version_str.length(), + stale_versions_arr.GetAllocator()); + stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator()); + } + root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator()); + + // add stale version rowsets + root.AddMember("stale version path", path_arr, root.GetAllocator()); + + // to json string + rapidjson::StringBuffer strbuf; + rapidjson::PrettyWriter writer(strbuf); + root.Accept(writer); + *json_result = std::string(strbuf.GetString()); +} + +} // namespace doris diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h new file mode 100644 index 00000000000000..537c8fe134d13f --- /dev/null +++ b/be/src/cloud/cloud_tablet.h @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "olap/base_tablet.h" +#include "olap/version_graph.h" + +namespace doris { + +class CloudStorageEngine; + +class CloudTablet final : public BaseTablet { +public: + CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta); + + ~CloudTablet() override; + + bool exceed_version_limit(int32_t limit) override; + + Result> create_rowset_writer(RowsetWriterContext& context, + bool vertical) override; + + Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, + bool skip_missing_version) override; + + size_t tablet_footprint() override { + return _approximate_data_size.load(std::memory_order_relaxed); + } + + // clang-format off + int64_t fetch_add_approximate_num_rowsets (int64_t x) { return _approximate_num_rowsets .fetch_add(x, std::memory_order_relaxed); } + int64_t fetch_add_approximate_num_segments(int64_t x) { return _approximate_num_segments.fetch_add(x, std::memory_order_relaxed); } + int64_t fetch_add_approximate_num_rows (int64_t x) { return _approximate_num_rows .fetch_add(x, std::memory_order_relaxed); } + int64_t fetch_add_approximate_data_size (int64_t x) { return _approximate_data_size .fetch_add(x, std::memory_order_relaxed); } + int64_t fetch_add_approximate_cumu_num_rowsets (int64_t x) { return _approximate_cumu_num_rowsets.fetch_add(x, std::memory_order_relaxed); } + int64_t fetch_add_approximate_cumu_num_deltas (int64_t x) { return _approximate_cumu_num_deltas.fetch_add(x, std::memory_order_relaxed); } + // clang-format on + + // meta lock must be held when calling this function + void reset_approximate_stats(int64_t num_rowsets, int64_t num_segments, int64_t num_rows, + int64_t data_size); + + // return a json string to show the compaction status of this tablet + void get_compaction_status(std::string* json_result); + + // Synchronize the rowsets from meta service. + // If tablet state is not `TABLET_RUNNING`, sync tablet meta and all visible rowsets. + // If `query_version` > 0 and local max_version of the tablet >= `query_version`, do nothing. + // If 'need_download_data_async' is true, it means that we need to download the new version + // rowsets datas async. + Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data = false); + + // Synchronize the tablet meta from meta service. + Status sync_meta(); + + // If `version_overlap` is true, function will delete rowsets with overlapped version in this tablet. + // If 'warmup_delta_data' is true, download the new version rowset data in background. + // MUST hold EXCLUSIVE `_meta_lock`. + // If 'need_download_data_async' is true, it means that we need to download the new version + // rowsets datas async. + void add_rowsets(std::vector to_add, bool version_overlap, + std::unique_lock& meta_lock, + bool warmup_delta_data = false); + + // MUST hold EXCLUSIVE `_meta_lock`. + void delete_rowsets(const std::vector& to_delete, + std::unique_lock& meta_lock); + + // When the tablet is dropped, we need to recycle cached data: + // 1. The data in file cache + // 2. The memory in tablet cache + void recycle_cached_data(); + + void recycle_cached_data(const std::vector& rowsets); + + // Return number of deleted stale rowsets + int delete_expired_stale_rowsets(); + +private: + Versions calc_missed_versions(int64_t spec_version); + + // FIXME(plat1ko): No need to record base size if rowsets are ordered by version + void update_base_size(const Rowset& rs); + + Status sync_if_not_running(); + + CloudStorageEngine& _engine; + + // this mutex MUST ONLY be used when sync meta + bthread::Mutex _sync_meta_lock; + + std::atomic _cumulative_point {-1}; + std::atomic _approximate_num_rowsets {-1}; + std::atomic _approximate_num_segments {-1}; + std::atomic _approximate_num_rows {-1}; + std::atomic _approximate_data_size {-1}; + std::atomic _approximate_cumu_num_rowsets {-1}; + // Number of sorted arrays (e.g. for rowset with N segments, if rowset is overlapping, delta is N, otherwise 1) after cumu point + std::atomic _approximate_cumu_num_deltas {-1}; + + [[maybe_unused]] int64_t _base_compaction_cnt = 0; + [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0; + int64_t _max_version = -1; + int64_t _base_size = 0; +}; + +} // namespace doris diff --git a/be/src/common/status.h b/be/src/common/status.h index 5199e86d120da2..2bec1c397e8e89 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -274,7 +274,8 @@ namespace ErrorCode { E(INVERTED_INDEX_COMPACTION_ERROR, -6010, false); \ E(KEY_NOT_FOUND, -7000, false); \ E(KEY_ALREADY_EXISTS, -7001, false); \ - E(ENTRY_NOT_FOUND, -7002, false); + E(ENTRY_NOT_FOUND, -7002, false); \ + E(INVALID_TABLET_STATE, -7211, false); // Define constexpr int error_code_name = error_code_value #define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 43f514906cf02e..18445cb17a67da 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -19,6 +19,8 @@ #include +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_reader.h" #include "olap/tablet_fwd.h" #include "olap/tablet_schema_cache.h" #include "util/doris_metrics.h" @@ -79,4 +81,42 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_ return Status::OK(); } +Status BaseTablet::capture_rs_readers_unlocked(const std::vector& version_path, + std::vector* rs_splits) const { + DCHECK(rs_splits != nullptr && rs_splits->empty()); + for (auto version : version_path) { + auto it = _rs_version_map.find(version); + if (it == _rs_version_map.end()) { + VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" << tablet_id() + << ", version='" << version.first << "-" << version.second; + + it = _stale_rs_version_map.find(version); + if (it == _stale_rs_version_map.end()) { + return Status::Error( + "fail to find Rowset in stale_rs_version for version. tablet={}, " + "version={}-{}", + tablet_id(), version.first, version.second); + } + } + RowsetReaderSharedPtr rs_reader; + auto res = it->second->create_reader(&rs_reader); + if (!res.ok()) { + return Status::Error( + "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); + } + rs_splits->push_back(RowSetSplits(std::move(rs_reader))); + } + return Status::OK(); +} + +bool BaseTablet::_reconstruct_version_tracker_if_necessary() { + double orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio(); + if (orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { + _timestamped_version_tracker.construct_versioned_tracker( + _tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); + return true; + } + return false; +} + } /* namespace doris */ diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 2fa494b420aab3..bb327b3953298c 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "olap/tablet_fwd.h" #include "olap/tablet_meta.h" +#include "olap/version_graph.h" #include "util/metrics.h" namespace doris { @@ -72,19 +73,33 @@ class BaseTablet { return _max_version_schema; } - virtual bool exceed_version_limit(int32_t limit) const = 0; + virtual bool exceed_version_limit(int32_t limit) = 0; virtual Result> create_rowset_writer(RowsetWriterContext& context, bool vertical) = 0; virtual Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - bool skip_missing_version) const = 0; + bool skip_missing_version) = 0; virtual size_t tablet_footprint() = 0; + // MUST hold shared meta lock + Status capture_rs_readers_unlocked(const std::vector& version_path, + std::vector* rs_splits) const; + protected: + bool _reconstruct_version_tracker_if_necessary(); + mutable std::shared_mutex _meta_lock; + TimestampedVersionTracker _timestamped_version_tracker; + // After version 0.13, all newly created rowsets are saved in _rs_version_map. + // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map; + std::unordered_map _rs_version_map; + // This variable _stale_rs_version_map is used to record these rowsets which are be compacted. + // These _stale rowsets are been removed when rowsets' pathVersion is expired, + // this policy is judged and computed by TimestampedVersionTracker. + std::unordered_map _stale_rs_version_map; const TabletMetaSharedPtr _tablet_meta; TabletSchemaSPtr _max_version_schema; diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index b3e6b1ca9e0a1c..d09fce730e5395 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -92,4 +92,13 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& other) { } } +std::string Rowset::get_rowset_info_str() { + std::string disk_size = PrettyPrinter::print( + static_cast(_rowset_meta->total_disk_size()), TUnit::BYTES); + return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), end_version(), num_segments(), + _rowset_meta->has_delete_predicate() ? "DELETE" : "DATA", + SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()), + rowset_id().to_string(), disk_size); +} + } // namespace doris diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index a2a275f2eac32e..a1355a81198a97 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -302,6 +302,8 @@ class Rowset : public std::enable_shared_from_this { // set skip index compaction next time void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); } + std::string get_rowset_info_str(); + protected: friend class RowsetFactory; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 66e6ffcf2b22c2..327afd6a8c41de 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -833,7 +833,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } // acquire data sources correspond to history versions - RETURN_IF_ERROR(base_tablet->capture_rs_readers(versions_to_be_changed, &rs_splits)); + RETURN_IF_ERROR( + base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits)); if (rs_splits.empty()) { res = Status::Error( "fail to acquire all data sources. version_num={}, data_source_num={}", @@ -985,8 +986,8 @@ Status SchemaChangeHandler::_get_versions_to_be_changed( } *max_rowset = rowset; - RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), - versions_to_be_changed, false, false)); + RETURN_IF_ERROR(base_tablet->capture_consistent_versions_unlocked( + Version(0, rowset->version().second), versions_to_be_changed, false, false)); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 720cd5b2350b72..50ec23040967ab 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -757,7 +757,8 @@ void Tablet::delete_expired_stale_rowset() { Version test_version = Version(0, lastest_delta->end_version()); stale_version_path_map[*path_id_iter] = version_path; - Status status = capture_consistent_versions(test_version, nullptr, false, false); + Status status = + capture_consistent_versions_unlocked(test_version, nullptr, false, false); // 1. When there is no consistent versions, we must reconstruct the tracker. if (!status.ok()) { // 2. fetch missing version after delete @@ -867,19 +868,9 @@ void Tablet::delete_expired_stale_rowset() { #endif } -bool Tablet::_reconstruct_version_tracker_if_necessary() { - double orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio(); - if (orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { - _timestamped_version_tracker.construct_versioned_tracker( - _tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); - return true; - } - return false; -} - -Status Tablet::capture_consistent_versions(const Version& spec_version, - std::vector* version_path, - bool skip_missing_version, bool quiet) const { +Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, + std::vector* version_path, + bool skip_missing_version, bool quiet) const { Status status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); if (!status.ok() && !quiet) { @@ -914,10 +905,10 @@ Status Tablet::capture_consistent_versions(const Version& spec_version, Status Tablet::check_version_integrity(const Version& version, bool quiet) { std::shared_lock rdlock(_meta_lock); - return capture_consistent_versions(version, nullptr, false, quiet); + return capture_consistent_versions_unlocked(version, nullptr, false, quiet); } -bool Tablet::exceed_version_limit(int32_t limit) const { +bool Tablet::exceed_version_limit(int32_t limit) { if (_tablet_meta->version_count() > limit) { exceed_version_limit_counter << 1; return true; @@ -947,7 +938,8 @@ void Tablet::acquire_version_and_rowsets( Status Tablet::capture_consistent_rowsets(const Version& spec_version, std::vector* rowsets) const { std::vector version_path; - RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path, false, false)); + RETURN_IF_ERROR( + capture_consistent_versions_unlocked(spec_version, &version_path, false, false)); RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, rowsets)); return Status::OK(); } @@ -984,39 +976,12 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector& } Status Tablet::capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - bool skip_missing_version) const { + bool skip_missing_version) { + std::shared_lock rlock(_meta_lock); std::vector version_path; - RETURN_IF_ERROR( - capture_consistent_versions(spec_version, &version_path, skip_missing_version, false)); - RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits)); - return Status::OK(); -} - -Status Tablet::capture_rs_readers(const std::vector& version_path, - std::vector* rs_splits) const { - DCHECK(rs_splits != nullptr && rs_splits->empty()); - for (auto version : version_path) { - auto it = _rs_version_map.find(version); - if (it == _rs_version_map.end()) { - VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" << tablet_id() - << ", version='" << version.first << "-" << version.second; - - it = _stale_rs_version_map.find(version); - if (it == _stale_rs_version_map.end()) { - return Status::Error( - "fail to find Rowset in stale_rs_version for version. tablet={}, " - "version={}-{}", - tablet_id(), version.first, version.second); - } - } - RowsetReaderSharedPtr rs_reader; - auto res = it->second->create_reader(&rs_reader); - if (!res.ok()) { - return Status::Error( - "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); - } - rs_splits->push_back(RowSetSplits(std::move(rs_reader))); - } + RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, &version_path, + skip_missing_version, false)); + RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits)); return Status::OK(); } @@ -1419,16 +1384,6 @@ std::vector Tablet::pick_candidate_rowsets_to_build_inverted_in return candidate_rowsets; } -std::string Tablet::_get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag) { - const Version& ver = rowset->version(); - std::string disk_size = PrettyPrinter::print( - static_cast(rowset->rowset_meta()->total_disk_size()), TUnit::BYTES); - return strings::Substitute("[$0-$1] $2 $3 $4 $5 $6", ver.first, ver.second, - rowset->num_segments(), (delete_flag ? "DELETE" : "DATA"), - SegmentsOverlapPB_Name(rowset->rowset_meta()->segments_overlap()), - rowset->rowset_id().to_string(), disk_size); -} - // For http compaction action void Tablet::get_compaction_status(std::string* json_result) { rapidjson::Document root; @@ -1523,17 +1478,16 @@ void Tablet::get_compaction_status(std::string* json_result) { versions_arr.SetArray(); missing_versions_arr.SetArray(); int64_t last_version = -1; - for (int i = 0; i < rowsets.size(); ++i) { - const Version& ver = rowsets[i]->version(); + for (auto& rowset : rowsets) { + const Version& ver = rowset->version(); if (ver.first != last_version + 1) { rapidjson::Value miss_value; - miss_value.SetString( - strings::Substitute("[$0-$1]", last_version + 1, ver.first - 1).c_str(), - missing_versions_arr.GetAllocator()); + miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(), + missing_versions_arr.GetAllocator()); missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator()); } rapidjson::Value value; - std::string version_str = _get_rowset_info_str(rowsets[i], delete_flags[i]); + std::string version_str = rowset->get_rowset_info_str(); value.SetString(version_str.c_str(), version_str.length(), versions_arr.GetAllocator()); versions_arr.PushBack(value, versions_arr.GetAllocator()); last_version = ver.second; @@ -1544,15 +1498,9 @@ void Tablet::get_compaction_status(std::string* json_result) { // print all stale rowsets' version as an array rapidjson::Document stale_versions_arr; stale_versions_arr.SetArray(); - for (int i = 0; i < stale_rowsets.size(); ++i) { - const Version& ver = stale_rowsets[i]->version(); + for (auto& rowset : stale_rowsets) { rapidjson::Value value; - std::string disk_size = PrettyPrinter::print( - static_cast(stale_rowsets[i]->rowset_meta()->total_disk_size()), - TUnit::BYTES); - std::string version_str = strings::Substitute( - "[$0-$1] $2 $3 $4", ver.first, ver.second, stale_rowsets[i]->num_segments(), - stale_rowsets[i]->rowset_id().to_string(), disk_size); + std::string version_str = rowset->get_rowset_info_str(); value.SetString(version_str.c_str(), version_str.length(), stale_versions_arr.GetAllocator()); stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator()); @@ -3449,8 +3397,8 @@ Status Tablet::check_rowid_conversion( Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { // Ensure that the obtained versions of rowsets are continuous std::vector version_path; - RETURN_IF_ERROR( - capture_consistent_versions(Version(0, max_version), &version_path, false, false)); + RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, max_version), &version_path, + false, false)); for (auto& ver : version_path) { if (ver.second == 1) { // [0-1] rowset is empty for each tablet, skip it @@ -3719,8 +3667,7 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in if (rowsets != nullptr) { for (const auto& rowset : *rowsets) { rapidjson::Value value; - std::string version_str = - _get_rowset_info_str(rowset, rowset->rowset_meta()->has_delete_predicate()); + std::string version_str = rowset->get_rowset_info_str(); value.SetString(version_str.c_str(), version_str.length(), required_rowsets_arr.GetAllocator()); required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); @@ -3733,8 +3680,7 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in } for (const auto& rowset : rowsets) { rapidjson::Value value; - std::string version_str = - _get_rowset_info_str(rowset, rowset->rowset_meta()->has_delete_predicate()); + std::string version_str = rowset->get_rowset_info_str(); value.SetString(version_str.c_str(), version_str.length(), required_rowsets_arr.GetAllocator()); required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index d6ad0285233f38..d953d8fce4fcb0 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -123,7 +123,7 @@ class Tablet final : public BaseTablet { size_t num_rows(); int version_count() const; - bool exceed_version_limit(int32_t limit) const override; + bool exceed_version_limit(int32_t limit) override; uint64_t segment_count() const; Version max_version() const; Version max_version_unlocked() const; @@ -170,9 +170,9 @@ class Tablet final : public BaseTablet { // Given spec_version, find a continuous version path and store it in version_path. // If quiet is true, then only "does this path exist" is returned. // If skip_missing_version is true, return ok even there are missing versions. - Status capture_consistent_versions(const Version& spec_version, - std::vector* version_path, - bool skip_missing_version, bool quiet) const; + Status capture_consistent_versions_unlocked(const Version& spec_version, + std::vector* version_path, + bool skip_missing_version, bool quiet) const; // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; @@ -183,10 +183,7 @@ class Tablet final : public BaseTablet { std::vector* rowsets) const; // If skip_missing_version is true, skip versions if they are missing. Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - bool skip_missing_version) const override; - - Status capture_rs_readers(const std::vector& version_path, - std::vector* rs_splits) const; + bool skip_missing_version) override; // meta lock std::shared_mutex& get_header_lock() { return _meta_lock; } @@ -583,9 +580,6 @@ class Tablet final : public BaseTablet { std::shared_ptr cumulative_compaction_policy); uint32_t _calc_base_compaction_score() const; - // When the proportion of empty edges in the adjacency matrix used to represent the version graph - // in the version tracker is greater than the threshold, rebuild the version tracker - bool _reconstruct_version_tracker_if_necessary(); void _init_context_common_fields(RowsetWriterContext& context); void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre, @@ -607,7 +601,6 @@ class Tablet final : public BaseTablet { //////////////////////////////////////////////////////////////////////////// void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap); - std::string _get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag); public: static const int64_t K_INVALID_CUMULATIVE_POINT = -1; @@ -615,7 +608,6 @@ class Tablet final : public BaseTablet { private: StorageEngine& _engine; DataDir* _data_dir = nullptr; - TimestampedVersionTracker _timestamped_version_tracker; DorisCallOnce _init_once; // meta store lock is used for prevent 2 threads do checkpoint concurrently @@ -634,13 +626,6 @@ class Tablet final : public BaseTablet { // during publish_txn, which might take hundreds of milliseconds mutable std::mutex _rowset_update_lock; - // After version 0.13, all newly created rowsets are saved in _rs_version_map. - // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map; - std::unordered_map _rs_version_map; - // This variable _stale_rs_version_map is used to record these rowsets which are be compacted. - // These _stale rowsets are been removed when rowsets' pathVersion is expired, - // this policy is judged and computed by TimestampedVersionTracker. - std::unordered_map _stale_rs_version_map; // if this tablet is broken, set to true. default is false std::atomic _is_bad; // timestamp of last cumu compaction failure diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index c8a0387082c9a2..f14622081d91f8 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -36,6 +36,7 @@ #include "olap/file_header.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/rowset/rowset.h" #include "olap/tablet_meta_manager.h" #include "olap/utils.h" #include "util/debug_points.h" @@ -727,6 +728,12 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) { return Status::OK(); } +void TabletMeta::add_rowsets_unchecked(const std::vector& to_add) { + for (const auto& rs : to_add) { + _rs_metas.push_back(rs->rowset_meta()); + } +} + void TabletMeta::delete_rs_meta_by_version(const Version& version, std::vector* deleted_rs_metas) { auto it = _rs_metas.begin(); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index db7912a452c083..094bb21507d5c1 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -202,6 +202,12 @@ class TabletMeta { // used for after tablet cloned to clear stale rowset void clear_stale_rowset() { _stale_rs_metas.clear(); } + void clear_rowsets() { _rs_metas.clear(); } + + // MUST hold EXCLUSIVE `_meta_lock` in belonged Tablet + // `to_add` MUST NOT have overlapped version with `_rs_metas` in tablet meta. + void add_rowsets_unchecked(const std::vector& to_add); + bool all_beta() const; int64_t storage_policy_id() const { return _storage_policy_id; } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index e1f39f2948b314..f7fe7f813f231c 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -528,16 +528,12 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { if (_shared_scan_opt) { auto& read_source = tablets_read_source.emplace_back(); - { - std::shared_lock rdlock(tablet->get_header_lock()); - auto st = - tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false); - if (!st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << st; - return Status::InternalError( - "failed to initialize storage reader. tablet_id={} : {}", - tablet->tablet_id(), st.to_string()); - } + auto st = tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false); + if (!st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << st; + return Status::InternalError( + "failed to initialize storage reader. tablet_id={} : {}", + tablet->tablet_id(), st.to_string()); } if (!_state->skip_delete_predicate()) { read_source.fill_delete_predicates(); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index bc15cf7207fb80..20938ecb8ba2ce 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -182,15 +182,12 @@ Status NewOlapScanner::init() { // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts ReadSource read_source; - { - std::shared_lock rdlock(tablet->get_header_lock()); - auto st = tablet->capture_rs_readers(_tablet_reader_params.version, - &read_source.rs_splits, - _state->skip_missing_version()); - if (!st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << st; - return st; - } + auto st = tablet->capture_rs_readers(_tablet_reader_params.version, + &read_source.rs_splits, + _state->skip_missing_version()); + if (!st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << st; + return st; } if (!_state->skip_delete_predicate()) { read_source.fill_delete_predicates();