Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](cooldown)backup cooldown data #45589

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
3 changes: 1 addition & 2 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ class S3FileSystem final : public RemoteFileSystem {
// so no need to concat with prefix
abs_path = path;
} else {
// path with no schema
abs_path = _prefix / path;
abs_path = std::filesystem::path(fmt::format("s3://{}/{}", _bucket, _prefix)) / path;
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ void StorageEngine::do_remove_unused_remote_files() {
}
cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
}
auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf();
if (cooldown_replica_id != t->replica_id()) {
return;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new
return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}",
_rowset_meta->tablet_id(), rowset_id().to_string());
}

if (num_segments() < 1) {
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ bool RowsetMeta::has_variant_type_in_schema() const {
return _schema && _schema->num_variant_columns() > 0;
}

void RowsetMeta::clear_resource_id() {
_rowset_meta_pb.clear_resource_id();
}

void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const {
*rs_meta_pb = _rowset_meta_pb;
if (_schema) [[likely]] {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {

void set_remote_storage_resource(StorageResource resource);

void clear_resource_id();

const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }

bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ Status SingleReplicaCompaction::_fetch_rowset(const TReplicaInfo& addr, const st
RETURN_IF_ERROR(_download_files(tablet()->data_dir(), remote_url_prefix, local_path));
_pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids(
local_path, _tablet->tablet_id(), tablet()->replica_id(), _tablet->table_id(),
_tablet->partition_id(), _tablet->schema_hash()));
_tablet->partition_id(), _tablet->schema_hash(), false, 0));
// 4: finish_clone: create output_rowset and link file
return _finish_clone(local_data_path, rowset_version);
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -140,7 +141,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) {

Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, int64_t table_id,
int64_t partition_id, int32_t schema_hash) {
int64_t partition_id, int32_t schema_hash, bool is_restore, int64_t storage_policy_id) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
std::vector<PendingRowsetGuard> guards;
// check clone dir existed
Expand Down Expand Up @@ -170,6 +171,10 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
new_tablet_meta_pb.set_tablet_id(tablet_id);
*new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
new_tablet_meta_pb.set_replica_id(replica_id);
if (is_restore) {
new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
new_tablet_meta_pb.clear_cooldown_meta_id();
}
if (table_id > 0) {
new_tablet_meta_pb.set_table_id(table_id);
}
Expand Down Expand Up @@ -202,6 +207,9 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
} else {
// remote rowset
*rowset_meta = visible_rowset;
if (is_restore) {
rowset_meta->clear_resource_id();
}
}

rowset_meta->set_tablet_id(tablet_id);
Expand Down Expand Up @@ -489,11 +497,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
if (!is_single_rowset_clone && (!res.ok() || request.missing_version.empty())) {
if (!request.__isset.missing_version &&
ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) {
LOG(WARNING) << "currently not support backup tablet with cooldowned remote "
"data. tablet="
<< request.tablet_id;
return Status::NotSupported(
"currently not support backup tablet with cooldowned remote data");
LOG(INFO) << "Backup tablet with cooldowned remote data. tablet="
<< request.tablet_id;
}
/// not all missing versions are found, fall back to full snapshot.
res = Status::OK(); // reset res
Expand Down
8 changes: 3 additions & 5 deletions be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ class SnapshotManager {
// @param snapshot_path [in] 要被释放的snapshot的路径,只包含到ID
Status release_snapshot(const std::string& snapshot_path);

Result<std::vector<PendingRowsetGuard>> convert_rowset_ids(const std::string& clone_dir,
int64_t tablet_id,
int64_t replica_id, int64_t table_id,
int64_t partition_id,
int32_t schema_hash);
Result<std::vector<PendingRowsetGuard>> convert_rowset_ids(
const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, int64_t table_id,
int64_t partition_id, int32_t schema_hash, bool is_restore, int64_t storage_policy_id);

private:
Status _calc_snapshot_id_path(const TabletSharedPtr& tablet, int64_t timeout_s,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
// No need to try again with another BE
_pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids(
local_data_path, _clone_req.tablet_id, _clone_req.replica_id, _clone_req.table_id,
_clone_req.partition_id, _clone_req.schema_hash));
_clone_req.partition_id, _clone_req.schema_hash, false, 0));
break;
} // clone copy from one backend
return status;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
// rowset create time is useful when load tablet from meta to check which tablet is the tablet to load
_pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids(
full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(),
_tablet->partition_id(), schema_hash));
_tablet->partition_id(), schema_hash, false, 0));
return Status::OK();
}

Expand Down
190 changes: 167 additions & 23 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "gutil/strings/split.h"
#include "http/http_client.h"
#include "io/fs/broker_file_system.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
Expand All @@ -46,8 +47,10 @@
#include "io/fs/s3_file_system.h"
#include "io/hdfs_builder.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -120,6 +123,161 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l

SnapshotLoader::~SnapshotLoader() = default;

static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
const std::string& dir, const std::string& rowset,
std::vector<std::string>* remote_files) {
bool exists = true;
std::vector<io::FileInfo> files;
RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
for (auto& tmp_file : files) {
io::Path path(tmp_file.file_name);
std::string file_name = path.filename();

if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
!_end_with(file_name, ".idx")) {
continue;
}
remote_files->push_back(file_name);
}

return Status::OK();
}

static Status check_need_upload(const std::string& src_path, const std::string& local_file,
std::map<std::string, FileStat>& remote_files, std::string* md5sum,
bool* need_upload) {
// calc md5sum of localfile
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum;

// check if this local file need upload
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (*md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << *md5sum;
// TODO(cmy): save these files and delete them later
*need_upload = true;
}
} else {
*need_upload = true;
}

return Status::OK();
}

static Status download_and_upload_one_cold_file(
io::RemoteFileSystem& dest_fs, const StorageResource& cold_fs,
const std::string& remote_seg_path, const std::string& local_seg_path,
const std::string& dest_seg_path, const std::string& local_path,
const std::string& local_file, std::map<std::string, FileStat>& remote_files) {
RETURN_IF_ERROR(cold_fs.fs.get()->download(remote_seg_path, local_seg_path));

bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload));

if (!need_upload) {
VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file;
return Status::OK();
}

RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum));

//delete local file
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));

return Status::OK();
}

static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
const StorageResource& cold_fs,
const std::string& rowset_id, int segments,
int have_inverted_index,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();

std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, tablet_id);

for (int i = 0; i < segments; i++) {
std::string local_file = fmt::format("{}_{}.dat", rowset_id, i);
std::string remote_seg_path = cold_fs.remote_segment_path(tablet_id, rowset_id, i);
std::string local_seg_path = local_segment_path(local_path, rowset_id, i);
std::string dest_seg_path = local_segment_path(dest_path, rowset_id, i);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path,
local_seg_path, dest_seg_path, local_path,
local_file, remote_files));
}

if (!have_inverted_index) {
return res;
}

std::vector<std::string> remote_index_files;
RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs.fs.get(), remote_tablet_path,
rowset_id, &remote_index_files));

for (auto& index_file : remote_index_files) {
std::string remote_index_path = fmt::format("{}/{}", remote_tablet_path, index_file);
std::string local_seg_path = fmt::format("{}/{}", local_path, index_file);
std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path,
local_seg_path, dest_seg_path, local_path,
index_file, remote_files));
}
return res;
}

/*
* get the cooldown data info from the hdr file, download the cooldown data and
* upload it to remote storage.
*/
static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();
std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr";
auto tablet_meta = std::make_shared<TabletMeta>();
res = tablet_meta->create_from_file(hdr_file);
if (!res.ok()) {
return Status::Error<ErrorCode::ENGINE_LOAD_INDEX_TABLE_ERROR>(
"fail to load tablet_meta. file_path={}", hdr_file);
}

if (tablet_meta->tablet_id() != tablet_id) {
return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id());
}

if (!tablet_meta->cooldown_meta_id().initialized()) {
return res;
}

string rowset_id;
int segments;
int have_inverted_index;

auto storage_resource =
DORIS_TRY(get_resource_by_storage_policy_id(tablet_meta->storage_policy_id()));

for (auto rowset_meta : tablet_meta->all_rs_metas()) {
rowset_id = rowset_meta->rowset_id().to_string();
segments = rowset_meta->num_segments();
have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index();

if (segments > 0 && !rowset_meta->is_local()) {
RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path,
storage_resource, rowset_id, segments,
have_inverted_index, remote_files));
}
}

return res;
}

Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
std::map<int64_t, std::vector<std::string>>* tablet_files) {
if (!_remote_fs) {
Expand Down Expand Up @@ -168,29 +326,11 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
for (auto& local_file : local_files) {
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
TTaskType::type::UPLOAD));

// calc md5sum of localfile
bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(
io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum;
check_need_upload(src_path, local_file, remote_files, &md5sum, &need_upload));
local_files_with_checksum.push_back(local_file + "." + md5sum);

// check if this local file need upload
bool need_upload = false;
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << md5sum;
// TODO(cmy): save these files and delete them later
need_upload = true;
}
} else {
need_upload = true;
}

if (!need_upload) {
VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file;
continue;
Expand All @@ -202,6 +342,10 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum));
} // end for each tablet's local files

// 2.4. upload cooldown data files
RETURN_IF_ERROR(
upload_remote_cold_file(*_remote_fs, tablet_id, src_path, dest_path, remote_files));

tablet_files->emplace(tablet_id, local_files_with_checksum);
finished_num++;
LOG(INFO) << "finished to write tablet to remote. local path: " << src_path
Expand Down Expand Up @@ -754,9 +898,9 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
}

// rename the rowset ids and tabletid info in rowset meta
auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id,
tablet->replica_id(), tablet->table_id(),
tablet->partition_id(), schema_hash);
auto res = _engine.snapshot_mgr()->convert_rowset_ids(
snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(),
tablet->partition_id(), schema_hash, true, tablet->storage_policy_id());
if (!res.has_value()) [[unlikely]] {
auto err_msg =
fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}",
Expand Down
Loading
Loading