From 2738cedcd6c0fe4c5ff74d67c0a58d0aff84fee3 Mon Sep 17 00:00:00 2001 From: ayuanzhang Date: Tue, 12 Nov 2024 11:23:32 +0800 Subject: [PATCH] [feature](backup) backup restore cooldown data --- be/src/io/fs/s3_file_system.h | 4 +- be/src/olap/rowset/rowset_meta.h | 2 + be/src/olap/single_replica_compaction.cpp | 2 +- be/src/olap/snapshot_manager.cpp | 18 +- be/src/olap/snapshot_manager.h | 3 +- be/src/olap/tablet_schema.h | 8 + be/src/olap/task/engine_clone_task.cpp | 2 +- .../task/engine_storage_migration_task.cpp | 2 +- be/src/runtime/snapshot_loader.cpp | 198 ++- be/src/runtime/snapshot_loader.h | 2 - .../Backup-and-Restore/RESTORE.md | 2 + .../apache/doris/analysis/RestoreStmt.java | 35 + .../apache/doris/backup/BackupHandler.java | 6 +- .../org/apache/doris/backup/BackupJob.java | 38 +- .../apache/doris/backup/BackupJobInfo.java | 30 + .../org/apache/doris/backup/BackupMeta.java | 28 +- .../org/apache/doris/backup/RestoreJob.java | 225 ++- .../org/apache/doris/catalog/OlapTable.java | 6 +- .../apache/doris/catalog/PartitionInfo.java | 8 +- .../org/apache/doris/catalog/ResourceMgr.java | 7 + .../org/apache/doris/catalog/S3Resource.java | 41 +- .../apache/doris/policy/DropPolicyLog.java | 5 + .../java/org/apache/doris/policy/Policy.java | 4 + .../org/apache/doris/policy/PolicyMgr.java | 25 +- .../apache/doris/policy/StoragePolicy.java | 57 +- .../doris/service/FrontendServiceImpl.java | 13 + .../doris/backup/BackupHandlerTest.java | 4 +- .../apache/doris/backup/RestoreJobTest.java | 6 +- .../test_backup_restore_cold_data.groovy | 1506 +++++++++++++++++ 29 files changed, 2207 insertions(+), 80 deletions(-) create mode 100644 regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 0da142881fe353..19d4801c27b38f 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -103,7 +103,9 @@ class S3FileSystem final : public RemoteFileSystem { return path; } else { // path with no schema - return _root_path / path; + return std::filesystem::path( + fmt::format("s3://{}/{}", _s3_conf.bucket, _s3_conf.prefix)) / + path; } } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 24e7dfbefb73b6..d600a7771b3a19 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -107,6 +107,8 @@ class RowsetMeta { _fs = std::move(fs); } + void clear_resource_id() { _rowset_meta_pb.clear_resource_id(); } + const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); } bool is_local() const { return !_rowset_meta_pb.has_resource_id(); } diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index fdccc78816f09b..5c5ff5a1121cb4 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -322,7 +322,7 @@ Status SingleReplicaCompaction::_fetch_rowset(const TReplicaInfo& addr, const st // change all rowset ids because they maybe its id same with local rowset auto olap_st = SnapshotManager::instance()->convert_rowset_ids( local_path, _tablet->tablet_id(), _tablet->replica_id(), _tablet->table_id(), - _tablet->partition_id(), _tablet->schema_hash()); + _tablet->partition_id(), _tablet->schema_hash(), false, 0); if (!olap_st.ok()) { LOG(WARNING) << "fail to convert rowset ids, path=" << local_path << ", tablet_id=" << _tablet->tablet_id() << ", error=" << olap_st; diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 6b97591014a25a..041f494f7bb6cc 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -38,6 +38,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/data_dir.h" #include "olap/olap_common.h" @@ -146,7 +147,8 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) { Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, int64_t table_id, - int64_t partition_id, const int32_t& schema_hash) { + int64_t partition_id, const int32_t& schema_hash, + bool is_restore, int64_t storage_policy_id) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); Status res = Status::OK(); // check clone dir existed @@ -181,6 +183,10 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t new_tablet_meta_pb.set_tablet_id(tablet_id); *new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); new_tablet_meta_pb.set_replica_id(replica_id); + if (is_restore) { + new_tablet_meta_pb.set_storage_policy_id(storage_policy_id); + new_tablet_meta_pb.clear_cooldown_meta_id(); + } if (table_id > 0) { new_tablet_meta_pb.set_table_id(table_id); } @@ -212,6 +218,9 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t } else { // remote rowset *rowset_meta = visible_rowset; + if (is_restore) { + rowset_meta->clear_resource_id(); + } } rowset_meta->set_tablet_id(tablet_id); @@ -521,11 +530,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet if (!is_single_rowset_clone && (!res.ok() || request.missing_version.empty())) { if (!request.__isset.missing_version && ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) { - LOG(WARNING) << "currently not support backup tablet with cooldowned remote " - "data. tablet=" - << request.tablet_id; - return Status::NotSupported( - "currently not support backup tablet with cooldowned remote data"); + LOG(INFO) << "Backup tablet with cooldowned remote data. tablet=" + << request.tablet_id; } /// not all missing versions are found, fall back to full snapshot. res = Status::OK(); // reset res diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index 78b9db8659b3e9..9b042277fde8c2 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -55,7 +55,8 @@ class SnapshotManager { static SnapshotManager* instance(); Status convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, - int64_t table_id, int64_t partition_id, const int32_t& schema_hash); + int64_t table_id, int64_t partition_id, const int32_t& schema_hash, + bool is_restore, int64_t storage_policy_id); private: SnapshotManager() : _snapshot_base_id(0) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 2ffbfb0ffe8ee3..a4e88c9cc75e32 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -269,6 +269,14 @@ class TabletSchema { segment_v2::CompressionTypePB compression_type() const { return _compression_type; } const std::vector& indexes() const { return _indexes; } + [[nodiscard]] bool has_inverted_index() const { + for (const auto& index : _indexes) { + if (index.index_type() == IndexType::INVERTED) { + return true; + } + } + return false; + } std::vector get_indexes_for_column(int32_t col_unique_id) const; bool has_inverted_index(int32_t col_unique_id) const; bool has_inverted_index_with_index_id(int64_t index_id) const; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 0b077076e574cb..cc816b25308e68 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -407,7 +407,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir, // change all rowset ids because they maybe its id same with local rowset status = SnapshotManager::instance()->convert_rowset_ids( local_data_path, _clone_req.tablet_id, _clone_req.replica_id, - _clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash); + _clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash, false, 0); } else { LOG_WARNING("failed to download snapshot from remote BE") .tag("url", _mask_token(remote_url_prefix)) diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 218922069c7bd6..c85109c9a084e1 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -158,7 +158,7 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load return SnapshotManager::instance()->convert_rowset_ids( full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(), - _tablet->partition_id(), schema_hash); + _tablet->partition_id(), schema_hash, false, 0); } Status EngineStorageMigrationTask::_reload_tablet(const std::string& full_path) { diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index d4891bb383839e..d703b03b638474 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -41,6 +41,7 @@ #include "gutil/strings/split.h" #include "http/http_client.h" #include "io/fs/broker_file_system.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" @@ -49,8 +50,10 @@ #include "io/fs/s3_file_system.h" #include "io/hdfs_builder.h" #include "olap/data_dir.h" +#include "olap/olap_define.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" +#include "olap/storage_policy.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" #include "runtime/client_cache.h" @@ -100,6 +103,166 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +bool _end_with(std::string_view str, std::string_view match) { + return str.size() >= match.size() && + str.compare(str.size() - match.size(), match.size(), match) == 0; +} + +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector* remote_files) { + bool exists = true; + std::vector files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status check_need_upload(const std::string& src_path, const std::string& local_file, + std::map& remote_files, std::string* md5sum, + bool* need_upload) { + // calc md5sum of localfile + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum)); + VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum; + + // check if this local file need upload + auto find = remote_files.find(local_file); + if (find != remote_files.end()) { + if (*md5sum != find->second.md5) { + // remote storage file exist, but with different checksum + LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first + << ", local: " << *md5sum; + // TODO(cmy): save these files and delete them later + *need_upload = true; + } + } else { + *need_upload = true; + } + + return Status::OK(); +} + +static Status download_and_upload_one_cold_file( + io::RemoteFileSystem& dest_fs, io::RemoteFileSystem* cold_fs, + const std::string& remote_seg_path, const std::string& local_seg_path, + const std::string& dest_seg_path, const std::string& local_path, + const std::string& local_file, std::map& remote_files) { + RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path)); + + bool need_upload = false; + std::string md5sum; + RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload)); + + if (!need_upload) { + VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file; + return Status::OK(); + } + + RETURN_IF_ERROR(dest_fs.upload_with_checksum(local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + io::RemoteFileSystem* cold_fs, const std::string& rowset_id, + int segments, int have_inverted_index, + std::map& remote_files) { + Status res = Status::OK(); + + for (int i = 0; i < segments; i++) { + std::string local_file = fmt::format("{}_{}.dat", rowset_id, i); + std::string remote_seg_path = + fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, i); + std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset_id, i); + std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset_id, i); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path, + local_seg_path, dest_seg_path, local_path, + local_file, remote_files)); + } + + if (!have_inverted_index) { + return res; + } + + std::vector remote_index_files; + RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path(tablet_id), + rowset_id, &remote_index_files)); + + for (auto& index_file : remote_index_files) { + std::string remote_index_path = + fmt::format("{}/{}", remote_tablet_path(tablet_id), index_file); + std::string local_seg_path = fmt::format("{}/{}", local_path, index_file); + std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path, + local_seg_path, dest_seg_path, local_path, + index_file, remote_files)); + } + return res; +} + +/* + * get the cooldown data info from the hdr file, download the cooldown data and + * upload it to remote storage. +*/ +static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + std::map& remote_files) { + Status res = Status::OK(); + std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr"; + + auto tablet_meta = std::make_shared(); + res = tablet_meta->create_from_file(hdr_file); + if (!res.ok()) { + return Status::Error( + "fail to load tablet_meta. file_path={}", hdr_file); + } + + if (tablet_meta->tablet_id() != tablet_id) { + return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id()); + } + + if (!tablet_meta->cooldown_meta_id().initialized()) { + return res; + } + + string rowset_id; + int segments; + int have_inverted_index; + + std::shared_ptr colddata_fs; + RETURN_IF_ERROR(get_remote_file_system(tablet_meta->storage_policy_id(), &colddata_fs)); + + for (auto rowset_meta : tablet_meta->all_rs_metas()) { + rowset_id = rowset_meta->rowset_id().to_string(); + segments = rowset_meta->num_segments(); + have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index(); + + if (segments > 0 && !rowset_meta->is_local()) { + RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path, + colddata_fs.get(), rowset_id, segments, + have_inverted_index, remote_files)); + } + } + + return res; +} + Status SnapshotLoader::upload(const std::map& src_to_dest_path, std::map>* tablet_files) { if (!_remote_fs) { @@ -150,28 +313,11 @@ Status SnapshotLoader::upload(const std::map& src_to_d TTaskType::type::UPLOAD)); const std::string& local_file = *it; - // calc md5sum of localfile + bool need_upload = false; std::string md5sum; RETURN_IF_ERROR( - io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); - VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; + check_need_upload(src_path, local_file, remote_files, &md5sum, &need_upload)); local_files_with_checksum.push_back(local_file + "." + md5sum); - - // check if this local file need upload - bool need_upload = false; - auto find = remote_files.find(local_file); - if (find != remote_files.end()) { - if (md5sum != find->second.md5) { - // remote storage file exist, but with different checksum - LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first - << ", local: " << md5sum; - // TODO(cmy): save these files and delete them later - need_upload = true; - } - } else { - need_upload = true; - } - if (!need_upload) { VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; continue; @@ -184,6 +330,10 @@ Status SnapshotLoader::upload(const std::map& src_to_d _remote_fs->upload_with_checksum(full_local_file, full_remote_file, md5sum)); } // end for each tablet's local files + // 2.4. upload cooldown data files + RETURN_IF_ERROR( + upload_remote_cold_file(*_remote_fs, tablet_id, src_path, dest_path, remote_files)); + tablet_files->emplace(tablet_id, local_files_with_checksum); finished_num++; LOG(INFO) << "finished to write tablet to remote. local path: " << src_path @@ -734,7 +884,7 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta // rename the rowset ids and tabletid info in rowset meta Status convert_status = SnapshotManager::instance()->convert_rowset_ids( snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(), - tablet->partition_id(), schema_hash); + tablet->partition_id(), schema_hash, true, tablet->storage_policy_id()); if (!convert_status.ok()) { std::stringstream ss; ss << "failed to convert rowsetids in snapshot: " << snapshot_path @@ -804,14 +954,6 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta return status; } -bool SnapshotLoader::_end_with(const std::string& str, const std::string& match) { - if (str.size() >= match.size() && - str.compare(str.size() - match.size(), match.size(), match) == 0) { - return true; - } - return false; -} - Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path, int64_t* tablet_id, int32_t* schema_hash) { diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h index c0d1f0f70864ce..ed8124a6f8788b 100644 --- a/be/src/runtime/snapshot_loader.h +++ b/be/src/runtime/snapshot_loader.h @@ -94,8 +94,6 @@ class SnapshotLoader { Status _get_existing_files_from_local(const std::string& local_path, std::vector* local_files); - bool _end_with(const std::string& str, const std::string& match); - Status _replace_tablet_id(const std::string& file_name, int64_t tablet_id, std::string* new_file_name); diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/RESTORE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/RESTORE.md index 8541500331b7dd..ac860eb0f2a280 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/RESTORE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Backup-and-Restore/RESTORE.md @@ -60,6 +60,8 @@ PROPERTIES ("key"="value", ...); - "reserve_dynamic_partition_enable" = "true":默认为 false。当该属性为 true 时,恢复的表会保留该表备份之前的'dynamic_partition_enable'属性值。该值不为true时,则恢复出来的表的'dynamic_partition_enable'属性值会设置为false。 - "timeout" = "3600":任务超时时间,默认为一天。单位秒。 - "meta_version" = 40:使用指定的 meta_version 来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。 + - "reserve_storage_policy" = "true":指定的恢复的表是否保留冷热分层属性。默认为true,备份集中保存的storage policy和对应的resource信息将在新集群中重建。恢复时数据都会下载到本地,再由降冷策略上传到远程。reserve_storage_policy设置为false,恢复后的表去除了冷热属性, 变为普通表。 + - "storage_resource" = "resource_name":指定恢复后表的冷数据使用的resource。建议在跨集群恢复时指定此属性。注意恢复后的storage policy中的storage_resource属性也会更新为指定的storage_resource。若指定了"reserve_storage_policy"="false",则忽略storage_resource属性。 ### Example diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index 9585a2e5069237..9b4d06421f1805 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -18,7 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.backup.Repository; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Resource; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -44,18 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt { public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; public static final String PROP_ATOMIC_RESTORE = "atomic_restore"; + public static final String PROP_STORAGE_RESOURCE = "storage_resource"; + public static final String PROP_RESERVE_STORAGE_POLICY = "reserve_storage_policy"; private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private String backupTimestamp = null; private int metaVersion = -1; private boolean reserveReplica = false; + private boolean reserveStoragePolicy = true; private boolean reserveDynamicPartitionEnable = false; private boolean isLocal = false; private boolean isBeingSynced = false; private boolean isCleanTables = false; private boolean isCleanPartitions = false; private boolean isAtomicRestore = false; + private String storageResource = ""; private byte[] meta = null; private byte[] jobInfo = null; @@ -83,6 +89,10 @@ public String getBackupTimestamp() { return backupTimestamp; } + public String getStorageResource() { + return storageResource; + } + public int getMetaVersion() { return metaVersion; } @@ -91,6 +101,10 @@ public boolean reserveReplica() { return reserveReplica; } + public boolean reserveStoragePolicy() { + return reserveStoragePolicy; + } + public boolean reserveDynamicPartitionEnable() { return reserveDynamicPartitionEnable; } @@ -208,6 +222,27 @@ public void analyzeProperties() throws AnalysisException { // is atomic restore isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); + if (copiedProperties.containsKey(PROP_STORAGE_RESOURCE)) { + storageResource = copiedProperties.get(PROP_STORAGE_RESOURCE); + Resource localResource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource); + + if (localResource == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "Restore storage resource " + storageResource + " is not exist"); + } + + if (localResource.getType() != Resource.ResourceType.S3) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "The type of local resource " + + storageResource + " is not same as restored resource"); + } + + copiedProperties.remove(PROP_STORAGE_RESOURCE); + } + + // reserve storage policy + reserveStoragePolicy = eatBooleanProperty(copiedProperties, PROP_RESERVE_STORAGE_POLICY, reserveStoragePolicy); + if (!copiedProperties.isEmpty()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, "Unknown restore job properties: " + copiedProperties.keySet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index a3fd66692a2928..758af5d2b2ed91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -460,14 +460,14 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); + stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getStorageResource(), + stmt.reserveStoragePolicy(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, repository.getId()); + stmt.getStorageResource(), stmt.reserveStoragePolicy(), env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index dc92e9a07c3c1f..6155905d23aa64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Table; @@ -39,6 +40,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -58,6 +60,7 @@ import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -444,6 +447,7 @@ private void prepareAndSendSnapshotTask() { // copy all related schema at this moment List copiedTables = Lists.newArrayList(); List copiedResources = Lists.newArrayList(); + List copiedStoragePolicys = Lists.newArrayList(); AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (TableRef tableRef : tableRefs) { String tblName = tableRef.getName().getTbl(); @@ -461,7 +465,17 @@ private void prepareAndSendSnapshotTask() { if (getContent() == BackupContent.ALL) { prepareSnapshotTaskForOlapTableWithoutLock(db, (OlapTable) tbl, tableRef, batchTask); } - prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables); + prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables, + copiedStoragePolicys); + for (StoragePolicy policy : copiedStoragePolicys) { + Resource resource = Env.getCurrentEnv().getResourceMgr() + .getResource(policy.getStorageResource()); + if (resource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, + "backup job only support S3 type storage policy:" + resource.getType()); + return; + } + } break; case VIEW: prepareBackupMetaForViewWithoutLock((View) tbl, copiedTables); @@ -490,7 +504,7 @@ private void prepareAndSendSnapshotTask() { return; } - backupMeta = new BackupMeta(copiedTables, copiedResources); + backupMeta = new BackupMeta(copiedTables, copiedResources, copiedStoragePolicys); // send tasks for (AgentTask task : batchTask.getAllTasks()) { @@ -604,7 +618,8 @@ private void checkResourceForOdbcTable(OdbcTable odbcTable) { } private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTable olapTable, - List
copiedTables) { + List
copiedTables, + List copiedStoragePolicys) { // only copy visible indexes List reservedPartitions = tableRef.getPartitionNames() == null ? null : tableRef.getPartitionNames().getPartitionNames(); @@ -616,6 +631,23 @@ private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTab removeUnsupportProperties(copiedTbl); copiedTables.add(copiedTbl); + + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + // classify a table's all partitions by storage policy + for (Long partitionId : olapTable.getPartitionIds()) { + String policyName = partitionInfo.getDataProperty(partitionId).getStoragePolicy(); + if (StringUtils.isEmpty(policyName)) { + continue; + } + + StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(policyName); + StoragePolicy storagePolicy = (StoragePolicy) Env.getCurrentEnv().getPolicyMgr() + .getPolicy(checkedPolicyCondition); + + if (storagePolicy != null && !copiedStoragePolicys.contains(storagePolicy)) { + copiedStoragePolicys.add(storagePolicy); + } + } } private void prepareBackupMetaForViewWithoutLock(View view, List
copiedTables) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index b918cddef56691..a63c6f234cc07e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Resource; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; @@ -38,6 +39,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Joiner; @@ -330,6 +332,10 @@ public static class BriefBackupJobInfo { public List odbcTableList = Lists.newArrayList(); @SerializedName("odbc_resource_list") public List odbcResourceList = Lists.newArrayList(); + @SerializedName("s3_resource_list") + public List s3ResourceList = Lists.newArrayList(); + @SerializedName("storage_policy_list") + public List storagePolicyList = Lists.newArrayList(); public static BriefBackupJobInfo fromBackupJobInfo(BackupJobInfo backupJobInfo) { BriefBackupJobInfo briefBackupJobInfo = new BriefBackupJobInfo(); @@ -347,6 +353,8 @@ public static BriefBackupJobInfo fromBackupJobInfo(BackupJobInfo backupJobInfo) briefBackupJobInfo.viewList = backupJobInfo.newBackupObjects.views; briefBackupJobInfo.odbcTableList = backupJobInfo.newBackupObjects.odbcTables; briefBackupJobInfo.odbcResourceList = backupJobInfo.newBackupObjects.odbcResources; + briefBackupJobInfo.s3ResourceList = backupJobInfo.newBackupObjects.s3Resources; + briefBackupJobInfo.storagePolicyList = backupJobInfo.newBackupObjects.storagePolicies; return briefBackupJobInfo; } } @@ -365,6 +373,10 @@ public static class NewBackupObjects { public List odbcTables = Lists.newArrayList(); @SerializedName("odbc_resources") public List odbcResources = Lists.newArrayList(); + @SerializedName("s3_resources") + public List s3Resources = Lists.newArrayList(); + @SerializedName("storage_policy") + public List storagePolicies = Lists.newArrayList(); } public static class BackupOlapTableInfo { @@ -483,6 +495,11 @@ public static class BackupOdbcResourceInfo { public String name; } + public static class BackupS3ResourceInfo { + @SerializedName("name") + public String name; + } + // eg: __db_10001/__tbl_10002/__part_10003/__idx_10002/__10004 public String getFilePath(String db, String tbl, String part, String idx, long tabletId) { if (!db.equalsIgnoreCase(dbName)) { @@ -674,6 +691,19 @@ public static BackupJobInfo fromCatalog(long backupTime, String label, String db backupOdbcResourceInfo.name = odbcCatalogResource.getName(); jobInfo.newBackupObjects.odbcResources.add(backupOdbcResourceInfo); } + + if (resource instanceof S3Resource) { + S3Resource s3Resource = (S3Resource) resource; + BackupS3ResourceInfo backupS3ResourceInfo = new BackupS3ResourceInfo(); + backupS3ResourceInfo.name = s3Resource.getName(); + jobInfo.newBackupObjects.s3Resources.add(backupS3ResourceInfo); + } + } + + // storage policies + Collection storagePolicies = backupMeta.getStoragePolicyNameMap().values(); + for (StoragePolicy storagePolicy : storagePolicies) { + jobInfo.newBackupObjects.storagePolicies.add(storagePolicy.clone()); } return jobInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index 6a973ea45a2221..411ad7ddd580cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -17,11 +17,13 @@ package org.apache.doris.backup; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Table; import org.apache.doris.common.io.Writable; import org.apache.doris.meta.MetaContext; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.StoragePolicy; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -50,11 +52,13 @@ public class BackupMeta implements Writable { // resource name -> resource @SerializedName(value = "resourceNameMap") private Map resourceNameMap = Maps.newHashMap(); + // storagePolicy name -> resource + private Map storagePolicyNameMap = Maps.newHashMap(); private BackupMeta() { } - public BackupMeta(List
tables, List resources) { + public BackupMeta(List
tables, List resources, List storagePolicies) { for (Table table : tables) { tblNameMap.put(table.getName(), table); tblIdMap.put(table.getId(), table); @@ -62,6 +66,24 @@ public BackupMeta(List
tables, List resources) { for (Resource resource : resources) { resourceNameMap.put(resource.getName(), resource); } + + for (StoragePolicy policy : storagePolicies) { + storagePolicyNameMap.put(policy.getName(), policy); + + if (resourceNameMap.get(policy.getStorageResource()) != null) { + continue; + } + Resource resource = Env.getCurrentEnv().getResourceMgr() + .getResource(policy.getStorageResource()); + if (resource.getType() != Resource.ResourceType.S3) { + continue; + } + Resource copiedResource = resource.clone(); + if (copiedResource == null) { + continue; + } + resourceNameMap.put(policy.getStorageResource(), copiedResource); + } } public Map getTables() { @@ -72,6 +94,10 @@ public Map getResourceNameMap() { return resourceNameMap; } + public Map getStoragePolicyNameMap() { + return storagePolicyNameMap; + } + public Table getTable(String tblName) { return tblNameMap.get(tblName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 1db289dbaa9cb7..7e8ec6a1a1e7e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -47,6 +47,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ResourceMgr; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; @@ -66,6 +67,11 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.policy.Policy; +import org.apache.doris.policy.PolicyMgr; +import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.Tag; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -94,6 +100,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table.Cell; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -107,6 +114,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -121,6 +129,9 @@ public class RestoreJob extends AbstractJob { private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE; + private static final String PROP_STORAGE_RESOURCE = RestoreStmt.PROP_STORAGE_RESOURCE; + private static final String PROP_RESERVE_STORAGE_POLICY = RestoreStmt.PROP_RESERVE_STORAGE_POLICY; + private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__"; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -167,6 +178,7 @@ public enum RestoreJobState { private List> restoredPartitions = Lists.newArrayList(); private List
restoredTbls = Lists.newArrayList(); private List restoredResources = Lists.newArrayList(); + private List storagePolicies = Lists.newArrayList(); // save all restored partitions' version info which are already exist in catalog // table id -> partition id -> (version, version hash) @@ -193,7 +205,10 @@ public enum RestoreJobState { private boolean isCleanPartitions = false; // Whether to restore the data into a temp table, and then replace the origin one. private boolean isAtomicRestore = false; - + // the target storage resource + private String storageResource = ""; + // whether to reserve storage policy + private boolean reserveStoragePolicy = false; // restore properties private Map properties = Maps.newHashMap(); @@ -211,7 +226,8 @@ public RestoreJob(JobType jobType) { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { + boolean isCleanPartitions, boolean isAtomicRestore, String storageResource, + boolean reserveStoragePolicy, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -229,21 +245,26 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.isCleanTables = isCleanTables; this.isCleanPartitions = isCleanPartitions; this.isAtomicRestore = isAtomicRestore; + this.storageResource = storageResource; + this.reserveStoragePolicy = reserveStoragePolicy; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore)); + properties.put(PROP_STORAGE_RESOURCE, storageResource); + properties.put(PROP_RESERVE_STORAGE_POLICY, String.valueOf(reserveStoragePolicy)); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { + boolean isCleanPartitions, boolean isAtomicRestore, String storageResource, + boolean reserveStoragePolicy, Env env, long repoId, BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, - repoId); + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, + storageResource, reserveStoragePolicy, env, repoId); this.backupMeta = backupMeta; } @@ -624,6 +645,32 @@ private void checkAndPrepareMeta() { } } + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(StringUtils.isNotEmpty(storageResource) + ? storageResource : backupS3ResourceInfo.name); + if (resource == null) { + continue; + } + if (resource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, + "The local resource " + resource.getName() + + " with the same name but a different type of backup meta."); + return; + } + } + + for (StoragePolicy backupStoargePolicy : jobInfo.newBackupObjects.storagePolicies) { + String backupStoragePoliceName = backupStoargePolicy.getName(); + Optional localPolicy = Env.getCurrentEnv().getPolicyMgr().findPolicy(backupStoragePoliceName, + PolicyTypeEnum.STORAGE); + if (localPolicy.isPresent() && localPolicy.get().getType() != PolicyTypeEnum.STORAGE) { + status = new Status(ErrCode.COMMON_ERROR, + "The local policy " + backupStoragePoliceName + + " with the same name but a different type of backup meta."); + return; + } + } + // the new tablets -> { local tablet, schema hash, storage medium }, used in atomic restore. Map tabletBases = new HashMap<>(); @@ -680,6 +727,16 @@ private void checkAndPrepareMeta() { BackupPartitionInfo backupPartInfo = partitionEntry.getValue(); Partition localPartition = localOlapTbl.getPartition(partitionName); Partition remotePartition = remoteOlapTbl.getPartition(partitionName); + + String policyName = remoteOlapTbl.getPartitionInfo() + .getDataProperty(remotePartition.getId()).getStoragePolicy(); + if (StringUtils.isNotEmpty(policyName)) { + status = new Status(ErrCode.COMMON_ERROR, "Can't restore remote partition " + + partitionName + " in table " + remoteTbl.getName() + " with storage policy " + + policyName + " when local table " + localTbl.getName() + " exist." + + " Please drop old table and restore again."); + return; + } if (localPartition != null) { // Partition already exist. PartitionInfo localPartInfo = localOlapTbl.getPartitionInfo(); @@ -766,7 +823,8 @@ private void checkAndPrepareMeta() { // reset all ids in this table String srcDbName = jobInfo.dbName; - Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica, srcDbName); + Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, + reserveReplica, reserveStoragePolicy, srcDbName); if (!st.ok()) { status = st; return; @@ -855,6 +913,18 @@ private void checkAndPrepareMeta() { if (isAtomicRestore && !restoredPartitions.isEmpty()) { throw new RuntimeException("atomic restore is set, but the restored partitions is not empty"); } + + // check and restore resources + checkAndRestoreResources(); + if (!status.ok()) { + return; + } + // check and restore storage policies, should before createReplicas to get storage_policy_id + checkAndRestoreStoragePolicies(); + if (!status.ok()) { + return; + } + for (Pair entry : restoredPartitions) { OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); Preconditions.checkNotNull(localTbl, localTbl.getName()); @@ -908,11 +978,6 @@ private void checkAndPrepareMeta() { db.readUnlock(); } - // check and restore resources - checkAndRestoreResources(); - if (!status.ok()) { - return; - } LOG.debug("finished to restore resources. {}", this.jobId); // Send create replica task to BE outside the db lock @@ -1206,7 +1271,7 @@ private void checkAndRestoreResources() { } else { try { // restore resource - resourceMgr.createResource(remoteOdbcResource, false); + resourceMgr.createResource(remoteOdbcResource); } catch (DdlException e) { status = new Status(ErrCode.COMMON_ERROR, e.getMessage()); return; @@ -1214,6 +1279,100 @@ private void checkAndRestoreResources() { restoredResources.add(remoteOdbcResource); } } + + if (!reserveStoragePolicy) { + return; + } + + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + String backupResourceName = backupS3ResourceInfo.name; + Resource localResource = resourceMgr.getResource(StringUtils.isNotEmpty(storageResource) + ? storageResource : backupResourceName); + S3Resource remoteS3Resource = (S3Resource) backupMeta.getResource(backupResourceName); + + if (StringUtils.isNotEmpty(storageResource)) { + if (localResource != null) { + if (localResource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, "The type of local resource " + + backupResourceName + " is not same as restored resource"); + return; + } + S3Resource localS3Resource = (S3Resource) localResource; + if (localS3Resource.getProperty(S3Properties.ENDPOINT) + .equals(remoteS3Resource.getProperty(S3Properties.ENDPOINT)) + && localS3Resource.getProperty(S3Properties.BUCKET) + .equals(remoteS3Resource.getProperty(S3Properties.BUCKET)) + && localS3Resource.getProperty(S3Properties.ROOT_PATH) + .equals(remoteS3Resource.getProperty(S3Properties.ROOT_PATH))) { + status = new Status(ErrCode.COMMON_ERROR, "local S3 resource " + + storageResource + " root path " + localS3Resource.getProperty(S3Properties.ROOT_PATH) + + " should not same as restored resource root path"); + return; + } + } else { + status = new Status(ErrCode.COMMON_ERROR, + "The local resource " + storageResource + " is not exist."); + return; + } + } else { + if (localResource != null) { + if (localResource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, "The type of local resource " + + backupResourceName + " is not same as restored resource"); + return; + } + S3Resource localS3Resource = (S3Resource) localResource; + if (localS3Resource.getSignature(BackupHandler.SIGNATURE_VERSION) + != remoteS3Resource.getSignature(BackupHandler.SIGNATURE_VERSION)) { + status = new Status(ErrCode.COMMON_ERROR, "S3 resource " + + jobInfo.getAliasByOriginNameIfSet(backupResourceName) + + " already exist but with different properties"); + return; + } + } else { + status = new Status(ErrCode.COMMON_ERROR, "Local resource " + + backupResourceName + " is not exist"); + return; + } + } + } + } + + private void checkAndRestoreStoragePolicies() { + if (!reserveStoragePolicy) { + return; + } + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy backupStoargePolicy : jobInfo.newBackupObjects.storagePolicies) { + String backupStoragePoliceName = backupStoargePolicy.getName(); + Optional localPolicy = policyMgr.findPolicy(backupStoragePoliceName, + PolicyTypeEnum.STORAGE); + + // use specified storageResource + if (StringUtils.isNotEmpty(storageResource)) { + backupStoargePolicy.setStorageResource(storageResource); + } + if (localPolicy.isPresent()) { + StoragePolicy localStoargePolicy = (StoragePolicy) localPolicy.get(); + // storage policy name and resource name should be same + if (localStoargePolicy.getSignature(BackupHandler.SIGNATURE_VERSION) + != backupStoargePolicy.getSignature(BackupHandler.SIGNATURE_VERSION)) { + status = new Status(ErrCode.COMMON_ERROR, "Storage policy " + + jobInfo.getAliasByOriginNameIfSet(backupStoragePoliceName) + + " already exist but with different properties"); + return; + } + + } else { + // restore storage policy + try { + policyMgr.createStoragePolicy(backupStoargePolicy); + } catch (Exception e) { + LOG.error("restore user property fail should not happen", e); + } + storagePolicies.add(backupStoargePolicy); + } + } } private boolean genFileMappingWhenBackupReplicasEqual(PartitionInfo localPartInfo, Partition localPartition, @@ -1272,6 +1431,11 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); + String storagePolicy = ""; + if (reserveStoragePolicy) { + storagePolicy = localTbl.getPartitionInfo() + .getDataProperty(restorePart.getId()).getStoragePolicy(); + } CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendId(), dbId, localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(), @@ -1284,7 +1448,8 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc localTbl.getPartitionInfo().getTabletType(restorePart.getId()), null, localTbl.getCompressionType(), - localTbl.getEnableUniqueKeyMergeOnWrite(), localTbl.getStoragePolicy(), + localTbl.getEnableUniqueKeyMergeOnWrite(), + storagePolicy, localTbl.disableAutoCompaction(), localTbl.enableSingleReplicaCompaction(), localTbl.skipWriteIndexOnLoad(), @@ -1302,7 +1467,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc if (baseTabletRef != null) { // ensure this replica is bound to the same backend disk as the origin table's replica. task.setBaseTablet(baseTabletRef.tabletId, baseTabletRef.schemaHash); - LOG.info("set base tablet {} for replica {} in restore job {}, tablet id={}", + LOG.info("set base tablet {} for replica {} in restore job {}, tablet id={},", baseTabletRef.tabletId, restoreReplica.getId(), jobId, restoreTablet.getId()); } batchTask.addTask(task); @@ -1485,6 +1650,22 @@ private void replayCheckAndPrepareMeta() { } } + // restored resource + ResourceMgr resourceMgr = Env.getCurrentEnv().getResourceMgr(); + for (Resource resource : restoredResources) { + resourceMgr.replayCreateResource(resource); + } + + // restored storage policy + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy storagePolicy : storagePolicies) { + Optional localPolicy = policyMgr.findPolicy(storagePolicy.getPolicyName(), + PolicyTypeEnum.STORAGE); + if (!localPolicy.isPresent()) { + policyMgr.replayCreate(storagePolicy); + } + } + // restored partitions for (Pair entry : restoredPartitions) { OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); @@ -1556,12 +1737,6 @@ private void replayCheckAndPrepareMeta() { } } - // restored resource - ResourceMgr resourceMgr = Env.getCurrentEnv().getResourceMgr(); - for (Resource resource : restoredResources) { - resourceMgr.replayCreateResource(resource); - } - LOG.info("replay check and prepare meta. {}", this); } @@ -2034,6 +2209,7 @@ private Status allTabletCommitted(boolean isReplay) { restoredPartitions.clear(); restoredTbls.clear(); restoredResources.clear(); + storagePolicies.clear(); // release snapshot before clearing snapshotInfos releaseSnapshots(); @@ -2289,6 +2465,13 @@ private void cancelInternal(boolean isReplay) { LOG.info("remove restored resource when cancelled: {}", resource.getName()); resourceMgr.dropResource(resource); } + + // remove restored storage policy + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy storagePolicy : storagePolicies) { + LOG.info("remove restored storage polciy when cancelled: {}", storagePolicy.getName()); + policyMgr.replayDrop(storagePolicy); + } } if (!isReplay) { @@ -2657,6 +2840,8 @@ private void readOthers(DataInput in) throws IOException { isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); + storageResource = properties.get(PROP_STORAGE_RESOURCE); + reserveStoragePolicy = Boolean.parseBoolean(properties.get(PROP_RESERVE_STORAGE_POLICY)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 9b95b1b20ec6ec..68b8e8aaa92750 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -565,7 +565,7 @@ public void resetVersionForRestore() { } public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restoreReplicaAlloc, - boolean reserveReplica, String srcDbName) { + boolean reserveReplica, boolean reserveStoragePolicy, String srcDbName) { // ATTN: The meta of the restore may come from different clusters, so the // original ID in the meta may conflict with the ID of the new cluster. For // example, if a newly allocated ID happens to be the same as an original ID, @@ -614,7 +614,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore boolean isSinglePartition = partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST; partitionInfo.resetPartitionIdForRestore(partitionMap, - reserveReplica ? null : restoreReplicaAlloc, isSinglePartition); + reserveReplica ? null : restoreReplicaAlloc, reserveStoragePolicy, isSinglePartition); // for each partition, reset rollup index map Map nextIndexs = Maps.newHashMap(); @@ -1682,7 +1682,7 @@ public OlapTable selectiveCopy(Collection reservedPartitions, IndexExtSt // set storage medium to HDD for backup job, because we want that the backuped table // can be able to restored to another Doris cluster without SSD disk. // But for other operation such as truncate table, keep the origin storage medium. - copied.getPartitionInfo().setDataProperty(partition.getId(), new DataProperty(TStorageMedium.HDD)); + copied.getPartitionInfo().getDataProperty(partition.getId()).setStorageMedium(TStorageMedium.HDD); } for (MaterializedIndex idx : partition.getMaterializedIndices(extState)) { idx.setState(IndexState.NORMAL); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 8cf1d664f5293c..ea3baf81fbb606 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -351,7 +351,7 @@ public void moveFromTempToFormal(long tempPartitionId) { public void resetPartitionIdForRestore( Map partitionIdMap, - ReplicaAllocation restoreReplicaAlloc, boolean isSinglePartitioned) { + ReplicaAllocation restoreReplicaAlloc, boolean reserveStoragePolicy, boolean isSinglePartitioned) { Map origIdToDataProperty = idToDataProperty; Map origIdToReplicaAllocation = idToReplicaAllocation; Map origIdToItem = idToItem; @@ -364,7 +364,8 @@ public void resetPartitionIdForRestore( idToStoragePolicy = Maps.newHashMap(); for (Map.Entry entry : partitionIdMap.entrySet()) { - idToDataProperty.put(entry.getKey(), origIdToDataProperty.get(entry.getValue())); + idToDataProperty.put(entry.getKey(), reserveStoragePolicy ? origIdToDataProperty.get(entry.getValue()) : + new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); idToReplicaAllocation.put(entry.getKey(), restoreReplicaAlloc == null ? origIdToReplicaAllocation.get(entry.getValue()) : restoreReplicaAlloc); @@ -372,7 +373,8 @@ public void resetPartitionIdForRestore( idToItem.put(entry.getKey(), origIdToItem.get(entry.getValue())); } idToInMemory.put(entry.getKey(), origIdToInMemory.get(entry.getValue())); - idToStoragePolicy.put(entry.getKey(), origIdToStoragePolicy.get(entry.getValue())); + idToStoragePolicy.put(entry.getKey(), reserveStoragePolicy + ? origIdToStoragePolicy.get(entry.getValue()) : ""); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index b6631ab519b474..c33007b2a0a838 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -81,6 +81,13 @@ public void createResource(CreateResourceStmt stmt) throws DdlException { } } + public void createResource(Resource resource) throws DdlException { + if (createResource(resource, false)) { + Env.getCurrentEnv().getEditLog().logCreateResource(resource); + LOG.info("Create resource success. Resource: {}", resource.getName()); + } + } + // Return true if the resource is truly added, // otherwise, return false or throw exception. public boolean createResource(Resource resource, boolean ifNotExists) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index 1dd09cbf981621..5275d4888b6055 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -34,11 +34,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.zip.Adler32; /** * S3 resource @@ -126,7 +128,9 @@ private static boolean pingS3(CloudCredentialWithEndpoint credential, String buc Map propertiesPing = new HashMap<>(); propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey()); propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey()); - propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken()); + if (credential.getSessionToken() != null) { + propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken()); + } propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint()); propertiesPing.put(S3Properties.Env.REGION, credential.getRegion()); propertiesPing.put(PropertyConverter.USE_PATH_STYLE, @@ -250,4 +254,39 @@ protected void getProcNodeData(BaseProcResult result) { } readUnlock(); } + + public int getSignature(int signatureVersion) { + Adler32 adler32 = new Adler32(); + adler32.update(signatureVersion); + final String charsetName = "UTF-8"; + + try { + // table name + adler32.update(name.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view name: {}", name); + } + // type + adler32.update(type.name().getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view type: {}", type.name()); + } + // configs + for (Map.Entry config : properties.entrySet()) { + adler32.update(config.getKey().getBytes(charsetName)); + adler32.update(config.getValue().getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view config: {}", config); + } + } + } catch (UnsupportedEncodingException e) { + LOG.error("encoding error", e); + return -1; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("signature: {}", Math.abs((int) adler32.getValue())); + } + return Math.abs((int) adler32.getValue()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java index 9b58e5b4d99512..88537630dd87f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java @@ -61,6 +61,11 @@ public class DropPolicyLog implements Writable { @SerializedName(value = "roleName") private String roleName; + public DropPolicyLog(PolicyTypeEnum type, String policyName) { + this.type = type; + this.policyName = policyName; + } + /** * Generate delete logs through stmt. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java index b06cd19d0cf8c2..b1052ce2e1cb6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -102,6 +102,10 @@ public Policy(long id, final PolicyTypeEnum type, final String policyName) { this.version = 0; } + public String getName() { + return policyName; + } + /** * Trans stmt to Policy. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 7ca5d4fcbeac76..1d7a1061a57e4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -117,6 +117,24 @@ public void createDefaultStoragePolicy() { LOG.info("Create default storage success."); } + /** + * Create policy through StoragePolicy. + **/ + public void createStoragePolicy(StoragePolicy storagePolicy) throws UserException { + Map pros = Maps.newConcurrentMap(); + if (storagePolicy.getCooldownTimestampMs() != -1) { + pros.put(StoragePolicy.COOLDOWN_DATETIME, String.valueOf(storagePolicy.getCooldownTimestampMs())); + } + if (storagePolicy.getCooldownTtl() != -1) { + pros.put(StoragePolicy.COOLDOWN_TTL, String.valueOf(storagePolicy.getCooldownTtl())); + } + pros.put(StoragePolicy.STORAGE_RESOURCE, storagePolicy.getStorageResource()); + + CreatePolicyStmt stmt = new CreatePolicyStmt(storagePolicy.getType(), true, + storagePolicy.getPolicyName(), pros); + createPolicy(stmt); + } + /** * Create policy through stmt. **/ @@ -247,7 +265,7 @@ public List getCopiedPoliciesByType(PolicyTypeEnum policyType) { } } - private List getPoliciesByType(PolicyTypeEnum policyType) { + public List getPoliciesByType(PolicyTypeEnum policyType) { if (typeToPolicyMap == null) { return new ArrayList<>(); } @@ -275,6 +293,11 @@ private void unprotectedAdd(Policy policy) { } + public void replayDrop(StoragePolicy policy) { + DropPolicyLog log = new DropPolicyLog(policy.getType(), policy.getPolicyName()); + replayDrop(log); + } + public void replayDrop(DropPolicyLog log) { unprotectedDrop(log); LOG.info("replay drop policy log: {}", log); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java index e18495c50d0fd5..72c99484dd279c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java @@ -24,8 +24,12 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.base.Strings; @@ -36,18 +40,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.time.LocalDateTime; import java.time.format.DateTimeParseException; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.zip.Adler32; /** * Save policy for storage migration. **/ @Data -public class StoragePolicy extends Policy { +public class StoragePolicy extends Policy implements Writable, GsonPostProcessable { public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy"; public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional defaultPolicy) @@ -381,4 +389,51 @@ public boolean removeResourceReference() { } return false; } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + /** + * Read Policy from file. + **/ + public static StoragePolicy read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, StoragePolicy.class); + } + + public int getSignature(int signatureVersion) { + Adler32 adler32 = new Adler32(); + adler32.update(signatureVersion); + final String charsetName = "UTF-8"; + + //ignore check id, version, cooldownTimestampMs, cooldownTtl + try { + // policy name + adler32.update(policyName.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. policy name: {}", policyName); + } + // storageResource name + adler32.update(storageResource.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. storageResource name: {}", storageResource); + } + // type + adler32.update(String.valueOf(getType()).getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. type : {}", getType()); + } + } catch (UnsupportedEncodingException e) { + LOG.error("encoding error", e); + return -1; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("signature: {}", Math.abs((int) adler32.getValue())); + } + return Math.abs((int) adler32.getValue()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d83ff7e08156d1..ea7348db7f0ffc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -29,6 +29,8 @@ import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TypeDef; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.backup.AbstractJob; +import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Snapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -277,6 +279,17 @@ public TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(TConfirmUnusedRe LOG.warn("tablet {} not found", info.tablet_id); return; } + + AbstractJob job = Env.getCurrentEnv().getBackupHandler().getJob(tabletMeta.getDbId()); + if (job != null && job instanceof BackupJob) { + BackupJob backupJob = (BackupJob) job; + if (backupJob.isDone() + && backupJob.getBackupMeta().getTable((tabletMeta.getTableId())) != null) { + LOG.warn("Backup is running on this tablet {} ", info.tablet_id); + return; + } + } + Tablet tablet; int replicaNum; try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java index 97e689b697256c..5258aea0815524 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java @@ -43,6 +43,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.persist.EditLog; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; @@ -212,7 +213,8 @@ public Status getSnapshotInfoFile(String label, String backupTimestamp, List tbls = Lists.newArrayList(); tbls.add(tbl); List resources = Lists.newArrayList(); - BackupMeta backupMeta = new BackupMeta(tbls, resources); + List storagePolicys = Lists.newArrayList(); + BackupMeta backupMeta = new BackupMeta(tbls, resources, storagePolicys); Map snapshotInfos = Maps.newHashMap(); for (Partition part : tbl.getPartitions()) { for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 7e8e55eea327c4..4904f26ca7f4a4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -251,13 +252,14 @@ boolean await(long timeout, TimeUnit unit) { db.dropTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null, false, env, repo.getId()); List
tbls = Lists.newArrayList(); List resources = Lists.newArrayList(); + List storagePolicies = Lists.newArrayList(); tbls.add(expectedRestoreTbl); - backupMeta = new BackupMeta(tbls, resources); + backupMeta = new BackupMeta(tbls, resources, storagePolicies); } @Test diff --git a/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy b/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy new file mode 100644 index 00000000000000..84345575d8d3fe --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy @@ -0,0 +1,1506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_cooldown", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + + + } + + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + sql "DROP TABLE ${dbName}.${tableName}" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after r0 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + + + } + + //cleanup + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop resource ${resource_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + drop resource ${resource_name2}; + """ +} + +// test restore back to old instance +suite("test_backup_cooldown_1", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown_1" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + def found = 0 + def records + def result + def row + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // 1 老表存在的情况 + // 1.1 restore 不指定。预期失败, 不支持将冷热属性的表恢复到已存在的表中。 + // 1.2 restore 指定 ("reserve_storage_policy"="true"), 预期失败, 不支持将冷热属性的表恢复到已存在的表中。 + // 1.3 restore 指定 ("reserve_storage_policy"="false"), 预期成功,且不落冷 + + // 2 删除老表 + // 2.1 restore 不指定 预期成功,且落冷 + // 2.2 restore 指定 ("reserve_storage_policy"="true")预期成功,且落冷 + // 2.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + + // 3 删除老表和policy + // 3.1 restore 不指定 预期成功,且落冷 + // 3.2 restore 指定 ("reserve_storage_policy"="true")预期成功,且落冷 + // 3.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + // 4 删除老表和resource、policy + // 4.1 restore 不指定 预期失败,resource不存在 + // 4.2 restore 指定 ("reserve_storage_policy"="true")预期失败,resource不存在 + // 4.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + + // 1. old table exist + // 1.1 restore normal fail + // 1.2 restore with("reserve_storage_policy"="true") fail + // 1.3 restore with("reserve_storage_policy"="false") success and don't cooldown + logger.info(" ====================================== 1.1 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + logger.info(" ====================================== 1.2 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + logger.info(" ====================================== 1.3 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // 2. drop old table + // 2.1 restore normal success and cooldown + // 2.2 restore with ("reserve_storage_policy"="true")success and cooldown + // 2.3 restore with ("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 2.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 2.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 2.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + // 3. drop old table and policy + // 3.1 restore normal success and cooldown + // 3.2 restore with("reserve_storage_policy"="true")success and cooldown + // 3.3 restore with("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 3.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 3.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 3.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check storage policy ${policy_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check resource ${resource_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.Name.equals(resource_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + // 4. drop old table and resource and policy + // 4.1 restore normal fail + // 4.2 restore with("reserve_storage_policy"="true") fail + // 4.3 restore with("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 4.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed with local restore is not exist + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("is not exist")) + + + logger.info(" ====================================== 4.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed with local restore is not exist + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("is not exist")) + + + logger.info(" ====================================== 4.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check storage policy ${policy_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check resource ${resource_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.Name.equals(resource_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // 5. alter policy and success + logger.info(" ====================================== 5.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + sql """ + ALTER STORAGE POLICY ${policy_name2} PROPERTIES ("cooldown_ttl" = "11"); + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + // check storage policy ${policy_name2} exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name2) && res2.CooldownTtl.equals("11")) { + found = 1 + break + } + } + assertEquals(found, 1) + + + + //cleanup + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop resource ${resource_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + drop resource ${resource_name2}; + """ +} + + + + + +// test restore back to a new instance +suite("test_backup_cooldown_2", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown_2" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String resource_new_name = "resource_${suiteName}_new" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + def found = 0 + def records + def syncer = getSyncer() + def result + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_new_name}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown3", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // 1 老表存在的情况 + // 1.1 restore 指定 ("storage_resource"="resource_name_exist"), 预期失败,不支持将冷热属性的表恢复到已存在的表中。 + // 1.2 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 1.3 restore 指定 ("storage_resource"="resource_new_name"), 预期失败,不支持将冷热属性的表恢复到已存在的表中。 + + + // 2 删除表 + // 2.1 restore 指定 ("storage_resource"="resource_name_exist"), 预期失败,resource路径需不一致 + // 2.2 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 2.3 restore 指定 ("storage_resource"="resource_new_name"), storage policy 存在失败 + + + // 3 删除表和policy + // 3.1 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 3.2 restore 指定 ("storage_resource"="resource_new_name"), 成功 + + + + // 4 删除表和policy 同时指定storage_resource和reserve_storage_policy + // 4.1 restore 指定 ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="true"), 预期失败,resource不存在 + // 4.2 restore 指定 ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="false"), 预期失败,resource不存在 + // 4.3 restore 指定 ("storage_resource"="resource_new_name", "reserve_storage_policy"="true"), 预期成功,且落冷 + // 4.4 restore 指定 ("storage_resource"="resource_new_name", "reserve_storage_policy"="false"), 预期成功,且不落冷 + + + + // 1 old table exist + // 1.1 restore with ("storage_resource"="resource_name1") fail + // 1.2 restore with ("storage_resource"="resource_name_not_exist") fail + // 1.3 restore with ("storage_resource"="resource_new_name") fail + logger.info(" ====================================== 1.1 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_name1}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + + + + logger.info(" ====================================== 1.2 ==================================== ") + def fail_restore_1 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + logger.info("fail_restore_1: ${fail_restore_1}") + + assertEquals(fail_restore_1, null) + + logger.info(" ====================================== 1.3 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + + + + // 2 drop old table + // 2.1 restore with ("storage_resource"="resource_name_exist")fail + // 2.2 restore with ("storage_resource"="resource_name_not_exist") fail + // 2.3 restore with ("storage_resource"="resource_new_name")fail + logger.info(" ====================================== 2.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_name1}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("should not same as restored resource root path")) + + + + + logger.info(" ====================================== 2.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + def fail_restore_2 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + assertEquals(fail_restore_2, null) + + + logger.info(" ====================================== 2.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("already exist but with different properties")) + + // 3 drop table and resource and policy + // 3.1 restore with ("storage_resource"="resource_name_not_exist") fail + // 3.2 restore with ("storage_resource"="resource_new_name") success and cooldown + logger.info(" ====================================== 3.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + def fail_restore_3 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + assertEquals(fail_restore_3, null) + + logger.info(" ====================================== 3.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + // check plocy_name1 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // check plocy_name2 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name2)) { + found = 1 + break + } + } + assertEquals(found, 1) + + + + // 4 drop table/resource/policy, set both storage_resource and reserve_storage_policy + // 4.1 restore with ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="true") fail + // 4.2 restore with ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="false") fail + // 4.3 restore with ("storage_resource"="resource_new_name", "reserve_storage_policy"="true") success and cooldown + // 4.4 restore with ("storage_resource"="resource_new_name", "reserve_storage_policy"="false") success and don't cooldown + logger.info(" ====================================== 4.1 ==================================== ") + sql "DROP TABLE if exists ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + def fail_restore_4 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist", + "reserve_storage_policy"="true" + ) + """ + + assertEquals(fail_restore_4, null) + + + logger.info(" ====================================== 4.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + def fail_restore_5 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist", + "reserve_storage_policy"="false" + ) + """ + + assertEquals(fail_restore_5, null) + + + logger.info(" ====================================== 4.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check plocy_name1 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // check plocy_name2 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name2)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + + logger.info(" ====================================== 4.4 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + //cleanup + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + try_sql """ + drop storage policy ${policy_name1}; + """ + + try_sql """ + drop resource ${resource_name1}; + """ + + try_sql """ + drop storage policy ${policy_name2}; + """ + + try_sql """ + drop resource ${resource_name2}; + """ +} \ No newline at end of file