From 60b9dc20b29139d4feef9fe05f83cdf78a15e895 Mon Sep 17 00:00:00 2001 From: ayuanzhang Date: Thu, 12 Dec 2024 18:56:21 +0800 Subject: [PATCH] [feature](cooldown)backup cooldown data --- be/src/io/fs/s3_file_system.h | 3 +- be/src/olap/olap_server.cpp | 2 +- be/src/olap/rowset/beta_rowset.cpp | 1 - be/src/olap/rowset/rowset_meta.cpp | 4 + be/src/olap/rowset/rowset_meta.h | 2 + be/src/olap/single_replica_compaction.cpp | 2 +- be/src/olap/snapshot_manager.cpp | 17 +- be/src/olap/snapshot_manager.h | 8 +- be/src/olap/task/engine_clone_task.cpp | 2 +- .../task/engine_storage_migration_task.cpp | 2 +- be/src/runtime/snapshot_loader.cpp | 190 ++- .../apache/doris/analysis/RestoreStmt.java | 35 + .../apache/doris/backup/BackupHandler.java | 6 +- .../org/apache/doris/backup/BackupJob.java | 38 +- .../apache/doris/backup/BackupJobInfo.java | 30 + .../org/apache/doris/backup/BackupMeta.java | 35 +- .../org/apache/doris/backup/RestoreJob.java | 224 ++- .../org/apache/doris/catalog/OlapTable.java | 6 +- .../apache/doris/catalog/PartitionInfo.java | 8 +- .../org/apache/doris/catalog/ResourceMgr.java | 7 + .../org/apache/doris/catalog/S3Resource.java | 37 + .../java/org/apache/doris/policy/Policy.java | 4 + .../org/apache/doris/policy/PolicyMgr.java | 25 +- .../apache/doris/policy/StoragePolicy.java | 57 +- .../doris/service/FrontendServiceImpl.java | 14 + .../doris/backup/BackupHandlerTest.java | 4 +- .../apache/doris/backup/RestoreJobTest.java | 6 +- .../test_backup_restore_cold_data.groovy | 1506 +++++++++++++++++ 28 files changed, 2197 insertions(+), 78 deletions(-) create mode 100644 regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index f6efa5053324ff..748338632ad3ff 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -120,8 +120,7 @@ class S3FileSystem final : public RemoteFileSystem { // so no need to concat with prefix abs_path = path; } else { - // path with no schema - abs_path = _prefix / path; + abs_path = std::filesystem::path(fmt::format("s3://{}/{}", _bucket, _prefix)) / path; } return Status::OK(); } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 90d0883984e78b..8884b1b1f926b2 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -1314,7 +1314,7 @@ void StorageEngine::do_remove_unused_remote_files() { } cooldown_meta_id = t->tablet_meta()->cooldown_meta_id(); } - auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf(); + auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf(); if (cooldown_replica_id != t->replica_id()) { return; } diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index cd52deed0c8a4d..8ca04925bfdd45 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -438,7 +438,6 @@ Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", _rowset_meta->tablet_id(), rowset_id().to_string()); } - if (num_segments() < 1) { return Status::OK(); } diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 6bed5e800ede4d..034aab25ea569d 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -133,6 +133,10 @@ bool RowsetMeta::has_variant_type_in_schema() const { return _schema && _schema->num_variant_columns() > 0; } +void RowsetMeta::clear_resource_id() { + _rowset_meta_pb.clear_resource_id(); +} + void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const { *rs_meta_pb = _rowset_meta_pb; if (_schema) [[likely]] { diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 46121aeae2be6d..592a503663b042 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -60,6 +60,8 @@ class RowsetMeta : public MetadataAdder { void set_remote_storage_resource(StorageResource resource); + void clear_resource_id(); + const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); } bool is_local() const { return !_rowset_meta_pb.has_resource_id(); } diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index 458f3949b17017..10cedf8c50ec26 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -321,7 +321,7 @@ Status SingleReplicaCompaction::_fetch_rowset(const TReplicaInfo& addr, const st RETURN_IF_ERROR(_download_files(tablet()->data_dir(), remote_url_prefix, local_path)); _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( local_path, _tablet->tablet_id(), tablet()->replica_id(), _tablet->table_id(), - _tablet->partition_id(), _tablet->schema_hash())); + _tablet->partition_id(), _tablet->schema_hash(), false, 0)); // 4: finish_clone: create output_rowset and link file return _finish_clone(local_data_path, rowset_version); } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 67205835b53947..5c230c44be183c 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -39,6 +39,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/data_dir.h" #include "olap/olap_common.h" @@ -140,7 +141,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) { Result> SnapshotManager::convert_rowset_ids( const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, int64_t table_id, - int64_t partition_id, int32_t schema_hash) { + int64_t partition_id, int32_t schema_hash, bool is_restore, int64_t storage_policy_id) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); std::vector guards; // check clone dir existed @@ -170,6 +171,10 @@ Result> SnapshotManager::convert_rowset_ids( new_tablet_meta_pb.set_tablet_id(tablet_id); *new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); new_tablet_meta_pb.set_replica_id(replica_id); + if (is_restore) { + new_tablet_meta_pb.set_storage_policy_id(storage_policy_id); + new_tablet_meta_pb.clear_cooldown_meta_id(); + } if (table_id > 0) { new_tablet_meta_pb.set_table_id(table_id); } @@ -202,6 +207,9 @@ Result> SnapshotManager::convert_rowset_ids( } else { // remote rowset *rowset_meta = visible_rowset; + if (is_restore) { + rowset_meta->clear_resource_id(); + } } rowset_meta->set_tablet_id(tablet_id); @@ -489,11 +497,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet if (!is_single_rowset_clone && (!res.ok() || request.missing_version.empty())) { if (!request.__isset.missing_version && ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) { - LOG(WARNING) << "currently not support backup tablet with cooldowned remote " - "data. tablet=" - << request.tablet_id; - return Status::NotSupported( - "currently not support backup tablet with cooldowned remote data"); + LOG(INFO) << "Backup tablet with cooldowned remote data. tablet=" + << request.tablet_id; } /// not all missing versions are found, fall back to full snapshot. res = Status::OK(); // reset res diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index dd10f7f355058b..306a01b83513ba 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -51,11 +51,9 @@ class SnapshotManager { // @param snapshot_path [in] 要被释放的snapshot的路径,只包含到ID Status release_snapshot(const std::string& snapshot_path); - Result> convert_rowset_ids(const std::string& clone_dir, - int64_t tablet_id, - int64_t replica_id, int64_t table_id, - int64_t partition_id, - int32_t schema_hash); + Result> convert_rowset_ids( + const std::string& clone_dir, int64_t tablet_id, int64_t replica_id, int64_t table_id, + int64_t partition_id, int32_t schema_hash, bool is_restore, int64_t storage_policy_id); private: Status _calc_snapshot_id_path(const TabletSharedPtr& tablet, int64_t timeout_s, diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index fa8d9b8248e3f4..5d6f9190742e22 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -459,7 +459,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir, // No need to try again with another BE _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( local_data_path, _clone_req.tablet_id, _clone_req.replica_id, _clone_req.table_id, - _clone_req.partition_id, _clone_req.schema_hash)); + _clone_req.partition_id, _clone_req.schema_hash, false, 0)); break; } // clone copy from one backend return status; diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index a300e6e0f09fa3..aaaf3e203280cf 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -161,7 +161,7 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load _pending_rs_guards = DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids( full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(), - _tablet->partition_id(), schema_hash)); + _tablet->partition_id(), schema_hash, false, 0)); return Status::OK(); } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 784904c78a3fb1..5f4d9fa9ddd51a 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -38,6 +38,7 @@ #include "gutil/strings/split.h" #include "http/http_client.h" #include "io/fs/broker_file_system.h" +#include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" @@ -46,8 +47,10 @@ #include "io/fs/s3_file_system.h" #include "io/hdfs_builder.h" #include "olap/data_dir.h" +#include "olap/olap_define.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" +#include "olap/storage_policy.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" #include "runtime/client_cache.h" @@ -120,6 +123,161 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector* remote_files) { + bool exists = true; + std::vector files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status check_need_upload(const std::string& src_path, const std::string& local_file, + std::map& remote_files, std::string* md5sum, + bool* need_upload) { + // calc md5sum of localfile + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum)); + VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum; + + // check if this local file need upload + auto find = remote_files.find(local_file); + if (find != remote_files.end()) { + if (*md5sum != find->second.md5) { + // remote storage file exist, but with different checksum + LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first + << ", local: " << *md5sum; + // TODO(cmy): save these files and delete them later + *need_upload = true; + } + } else { + *need_upload = true; + } + + return Status::OK(); +} + +static Status download_and_upload_one_cold_file( + io::RemoteFileSystem& dest_fs, const StorageResource& cold_fs, + const std::string& remote_seg_path, const std::string& local_seg_path, + const std::string& dest_seg_path, const std::string& local_path, + const std::string& local_file, std::map& remote_files) { + RETURN_IF_ERROR(cold_fs.fs.get()->download(remote_seg_path, local_seg_path)); + + bool need_upload = false; + std::string md5sum; + RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload)); + + if (!need_upload) { + VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file; + return Status::OK(); + } + + RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + const StorageResource& cold_fs, + const std::string& rowset_id, int segments, + int have_inverted_index, + std::map& remote_files) { + Status res = Status::OK(); + + std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, tablet_id); + + for (int i = 0; i < segments; i++) { + std::string local_file = fmt::format("{}_{}.dat", rowset_id, i); + std::string remote_seg_path = cold_fs.remote_segment_path(tablet_id, rowset_id, i); + std::string local_seg_path = local_segment_path(local_path, rowset_id, i); + std::string dest_seg_path = local_segment_path(dest_path, rowset_id, i); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path, + local_seg_path, dest_seg_path, local_path, + local_file, remote_files)); + } + + if (!have_inverted_index) { + return res; + } + + std::vector remote_index_files; + RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs.fs.get(), remote_tablet_path, + rowset_id, &remote_index_files)); + + for (auto& index_file : remote_index_files) { + std::string remote_index_path = fmt::format("{}/{}", remote_tablet_path, index_file); + std::string local_seg_path = fmt::format("{}/{}", local_path, index_file); + std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path, + local_seg_path, dest_seg_path, local_path, + index_file, remote_files)); + } + return res; +} + +/* + * get the cooldown data info from the hdr file, download the cooldown data and + * upload it to remote storage. + */ +static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + std::map& remote_files) { + Status res = Status::OK(); + std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr"; + auto tablet_meta = std::make_shared(); + res = tablet_meta->create_from_file(hdr_file); + if (!res.ok()) { + return Status::Error( + "fail to load tablet_meta. file_path={}", hdr_file); + } + + if (tablet_meta->tablet_id() != tablet_id) { + return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id()); + } + + if (!tablet_meta->cooldown_meta_id().initialized()) { + return res; + } + + string rowset_id; + int segments; + int have_inverted_index; + + auto storage_resource = + DORIS_TRY(get_resource_by_storage_policy_id(tablet_meta->storage_policy_id())); + + for (auto rowset_meta : tablet_meta->all_rs_metas()) { + rowset_id = rowset_meta->rowset_id().to_string(); + segments = rowset_meta->num_segments(); + have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index(); + + if (segments > 0 && !rowset_meta->is_local()) { + RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path, + storage_resource, rowset_id, segments, + have_inverted_index, remote_files)); + } + } + + return res; +} + Status SnapshotLoader::upload(const std::map& src_to_dest_path, std::map>* tablet_files) { if (!_remote_fs) { @@ -168,29 +326,11 @@ Status SnapshotLoader::upload(const std::map& src_to_d for (auto& local_file : local_files) { RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, TTaskType::type::UPLOAD)); - - // calc md5sum of localfile + bool need_upload = false; std::string md5sum; RETURN_IF_ERROR( - io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum)); - VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum; + check_need_upload(src_path, local_file, remote_files, &md5sum, &need_upload)); local_files_with_checksum.push_back(local_file + "." + md5sum); - - // check if this local file need upload - bool need_upload = false; - auto find = remote_files.find(local_file); - if (find != remote_files.end()) { - if (md5sum != find->second.md5) { - // remote storage file exist, but with different checksum - LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first - << ", local: " << md5sum; - // TODO(cmy): save these files and delete them later - need_upload = true; - } - } else { - need_upload = true; - } - if (!need_upload) { VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file; continue; @@ -202,6 +342,10 @@ Status SnapshotLoader::upload(const std::map& src_to_d RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); } // end for each tablet's local files + // 2.4. upload cooldown data files + RETURN_IF_ERROR( + upload_remote_cold_file(*_remote_fs, tablet_id, src_path, dest_path, remote_files)); + tablet_files->emplace(tablet_id, local_files_with_checksum); finished_num++; LOG(INFO) << "finished to write tablet to remote. local path: " << src_path @@ -754,9 +898,9 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta } // rename the rowset ids and tabletid info in rowset meta - auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, - tablet->replica_id(), tablet->table_id(), - tablet->partition_id(), schema_hash); + auto res = _engine.snapshot_mgr()->convert_rowset_ids( + snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(), + tablet->partition_id(), schema_hash, true, tablet->storage_policy_id()); if (!res.has_value()) [[unlikely]] { auto err_msg = fmt::format("failed to convert rowsetids in snapshot: {}, tablet path: {}, err: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index bc38cfe09e5606..655b5fb017d8da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -18,7 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.backup.Repository; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Resource; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -44,18 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; public static final String PROP_ATOMIC_RESTORE = "atomic_restore"; + public static final String PROP_STORAGE_RESOURCE = "storage_resource"; + public static final String PROP_RESERVE_STORAGE_POLICY = "reserve_storage_policy"; private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private String backupTimestamp = null; private int metaVersion = -1; private boolean reserveReplica = false; + private boolean reserveStoragePolicy = true; private boolean reserveDynamicPartitionEnable = false; private boolean isLocal = false; private boolean isBeingSynced = false; private boolean isCleanTables = false; private boolean isCleanPartitions = false; private boolean isAtomicRestore = false; + private String storageResource = null; private byte[] meta = null; private byte[] jobInfo = null; @@ -83,6 +89,10 @@ public String getBackupTimestamp() { return backupTimestamp; } + public String getStorageResource() { + return storageResource; + } + public int getMetaVersion() { return metaVersion; } @@ -91,6 +101,10 @@ public boolean reserveReplica() { return reserveReplica; } + public boolean reserveStoragePolicy() { + return reserveStoragePolicy; + } + public boolean reserveDynamicPartitionEnable() { return reserveDynamicPartitionEnable; } @@ -212,6 +226,27 @@ public void analyzeProperties() throws AnalysisException { // is atomic restore isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); + if (copiedProperties.containsKey(PROP_STORAGE_RESOURCE)) { + storageResource = copiedProperties.get(PROP_STORAGE_RESOURCE); + Resource localResource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource); + + if (localResource == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "Restore storage resource " + storageResource + " is not exist"); + } + + if (localResource.getType() != Resource.ResourceType.S3) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "The type of local resource " + + storageResource + " is not same as restored resource"); + } + + copiedProperties.remove(PROP_STORAGE_RESOURCE); + } + + // reserve storage policy + reserveStoragePolicy = eatBooleanProperty(copiedProperties, PROP_RESERVE_STORAGE_POLICY, reserveStoragePolicy); + if (!copiedProperties.isEmpty()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, "Unknown restore job properties: " + copiedProperties.keySet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 6f88881e3cb2a3..57c38cc6c50bba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -560,14 +560,14 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); + stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getStorageResource(), + stmt.reserveStoragePolicy(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, repository.getId()); + stmt.getStorageResource(), stmt.reserveStoragePolicy(), env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index de12670807f20e..32c49e7d01f9a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Table; @@ -43,6 +44,7 @@ import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -62,6 +64,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -494,6 +497,7 @@ private void prepareAndSendSnapshotTask() { // copy all related schema at this moment List copiedTables = Lists.newArrayList(); List copiedResources = Lists.newArrayList(); + List copiedStoragePolicys = Lists.newArrayList(); AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (TableRef tableRef : tableRefs) { String tblName = tableRef.getName().getTbl(); @@ -513,7 +517,17 @@ private void prepareAndSendSnapshotTask() { if (getContent() == BackupContent.ALL) { prepareSnapshotTaskForOlapTableWithoutLock(db, (OlapTable) tbl, tableRef, batchTask); } - prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables); + prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables, + copiedStoragePolicys); + for (StoragePolicy policy : copiedStoragePolicys) { + Resource resource = Env.getCurrentEnv().getResourceMgr() + .getResource(policy.getStorageResource()); + if (resource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, + "backup job only support S3 type storage policy:" + resource.getType()); + return; + } + } break; case VIEW: prepareBackupMetaForViewWithoutLock((View) tbl, copiedTables); @@ -547,7 +561,7 @@ private void prepareAndSendSnapshotTask() { return; } - backupMeta = new BackupMeta(copiedTables, copiedResources); + backupMeta = new BackupMeta(copiedTables, copiedResources, copiedStoragePolicys); // send tasks for (AgentTask task : batchTask.getAllTasks()) { @@ -666,7 +680,8 @@ private void checkResourceForOdbcTable(OdbcTable odbcTable) { } private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTable olapTable, - List
copiedTables) { + List
copiedTables, + List copiedStoragePolicys) { // only copy visible indexes List reservedPartitions = tableRef.getPartitionNames() == null ? null : tableRef.getPartitionNames().getPartitionNames(); @@ -678,6 +693,23 @@ private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTab removeUnsupportProperties(copiedTbl); copiedTables.add(copiedTbl); + + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + // classify a table's all partitions by storage policy + for (Long partitionId : olapTable.getPartitionIds()) { + String policyName = partitionInfo.getDataProperty(partitionId).getStoragePolicy(); + if (StringUtils.isEmpty(policyName)) { + continue; + } + + StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(policyName); + StoragePolicy storagePolicy = (StoragePolicy) Env.getCurrentEnv().getPolicyMgr() + .getPolicy(checkedPolicyCondition); + + if (storagePolicy != null && !copiedStoragePolicys.contains(storagePolicy)) { + copiedStoragePolicys.add(storagePolicy); + } + } } private void prepareBackupMetaForViewWithoutLock(View view, List
copiedTables) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index 554a21c44080f7..ca730f0c8cae7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Resource; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; @@ -41,6 +42,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Joiner; @@ -335,6 +337,10 @@ public static class BriefBackupJobInfo { public List odbcTableList = Lists.newArrayList(); @SerializedName("odbc_resource_list") public List odbcResourceList = Lists.newArrayList(); + @SerializedName("s3_resource_list") + public List s3ResourceList = Lists.newArrayList(); + @SerializedName("storage_policy_list") + public List storagePolicyList = Lists.newArrayList(); public static BriefBackupJobInfo fromBackupJobInfo(BackupJobInfo backupJobInfo) { BriefBackupJobInfo briefBackupJobInfo = new BriefBackupJobInfo(); @@ -352,6 +358,8 @@ public static BriefBackupJobInfo fromBackupJobInfo(BackupJobInfo backupJobInfo) briefBackupJobInfo.viewList = backupJobInfo.newBackupObjects.views; briefBackupJobInfo.odbcTableList = backupJobInfo.newBackupObjects.odbcTables; briefBackupJobInfo.odbcResourceList = backupJobInfo.newBackupObjects.odbcResources; + briefBackupJobInfo.s3ResourceList = backupJobInfo.newBackupObjects.s3Resources; + briefBackupJobInfo.storagePolicyList = backupJobInfo.newBackupObjects.storagePolicies; return briefBackupJobInfo; } } @@ -370,6 +378,10 @@ public static class NewBackupObjects { public List odbcTables = Lists.newArrayList(); @SerializedName("odbc_resources") public List odbcResources = Lists.newArrayList(); + @SerializedName("s3_resources") + public List s3Resources = Lists.newArrayList(); + @SerializedName("storage_policy") + public List storagePolicies = Lists.newArrayList(); } public static class BackupOlapTableInfo { @@ -488,6 +500,11 @@ public static class BackupOdbcResourceInfo { public String name; } + public static class BackupS3ResourceInfo { + @SerializedName("name") + public String name; + } + // eg: __db_10001/__tbl_10002/__part_10003/__idx_10002/__10004 public String getFilePath(String db, String tbl, String part, String idx, long tabletId) { if (!db.equalsIgnoreCase(dbName)) { @@ -687,6 +704,19 @@ public static BackupJobInfo fromCatalog(long backupTime, String label, String db backupOdbcResourceInfo.name = odbcCatalogResource.getName(); jobInfo.newBackupObjects.odbcResources.add(backupOdbcResourceInfo); } + + if (resource instanceof S3Resource) { + S3Resource s3Resource = (S3Resource) resource; + BackupS3ResourceInfo backupS3ResourceInfo = new BackupS3ResourceInfo(); + backupS3ResourceInfo.name = s3Resource.getName(); + jobInfo.newBackupObjects.s3Resources.add(backupS3ResourceInfo); + } + } + + // storage policies + Collection storagePolicies = backupMeta.getStoragePolicyNameMap().values(); + for (StoragePolicy storagePolicy : storagePolicies) { + jobInfo.newBackupObjects.storagePolicies.add(storagePolicy.clone()); } return jobInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index 0f1a043bdada3b..4947e9eeed74cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -26,6 +26,7 @@ import org.apache.doris.meta.MetaContext; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.StoragePolicy; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -52,11 +53,13 @@ public class BackupMeta implements Writable, GsonPostProcessable { // resource name -> resource @SerializedName(value = "resourceNameMap") private Map resourceNameMap = Maps.newHashMap(); + // storagePolicy name -> resource + private Map storagePolicyNameMap = Maps.newHashMap(); private BackupMeta() { } - public BackupMeta(List
tables, List resources) { + public BackupMeta(List
tables, List resources, List storagePolicies) { for (Table table : tables) { tblNameMap.put(table.getName(), table); tblIdMap.put(table.getId(), table); @@ -64,6 +67,24 @@ public BackupMeta(List
tables, List resources) { for (Resource resource : resources) { resourceNameMap.put(resource.getName(), resource); } + + for (StoragePolicy policy : storagePolicies) { + storagePolicyNameMap.put(policy.getName(), policy); + + if (resourceNameMap.get(policy.getStorageResource()) != null) { + continue; + } + Resource resource = Env.getCurrentEnv().getResourceMgr() + .getResource(policy.getStorageResource()); + if (resource.getType() != Resource.ResourceType.S3) { + continue; + } + Resource copiedResource = resource.clone(); + if (copiedResource == null) { + continue; + } + resourceNameMap.put(policy.getStorageResource(), copiedResource); + } } public Map getTables() { @@ -74,6 +95,10 @@ public Map getResourceNameMap() { return resourceNameMap; } + public Map getStoragePolicyNameMap() { + return storagePolicyNameMap; + } + public Table getTable(String tblName) { return tblNameMap.get(tblName); } @@ -130,6 +155,14 @@ public static BackupMeta read(DataInput in) throws IOException { @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); + out.writeInt(tblNameMap.size()); + for (Table table : tblNameMap.values()) { + table.write(out); + } + out.writeInt(resourceNameMap.size()); + for (Resource resource : resourceNameMap.values()) { + resource.write(out); + } } @Deprecated diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6dfd02b3a42648..51348345b1d5ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -49,6 +49,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ResourceMgr; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; @@ -68,8 +69,13 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.policy.Policy; +import org.apache.doris.policy.PolicyMgr; +import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.Tag; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -100,6 +106,7 @@ import com.google.common.collect.Table.Cell; import com.google.gson.annotations.SerializedName; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -110,6 +117,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -123,6 +131,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE; + private static final String PROP_STORAGE_RESOURCE = RestoreStmt.PROP_STORAGE_RESOURCE; + private static final String PROP_RESERVE_STORAGE_POLICY = RestoreStmt.PROP_RESERVE_STORAGE_POLICY; private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__"; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -182,6 +192,8 @@ public enum RestoreJobState { private List
restoredTbls = Lists.newArrayList(); @SerializedName("rr") private List restoredResources = Lists.newArrayList(); + @SerializedName("sp") + private List storagePolicies = Lists.newArrayList(); // save all restored partitions' version info which are already exist in catalog // table id -> partition id -> (version, version hash) @@ -210,7 +222,10 @@ public enum RestoreJobState { private boolean isCleanPartitions = false; // Whether to restore the data into a temp table, and then replace the origin one. private boolean isAtomicRestore = false; - + // the target storage resource + private String storageResource = null; + // whether to reserve storage policy + private boolean reserveStoragePolicy = false; // restore properties @SerializedName("prop") private Map properties = Maps.newHashMap(); @@ -228,7 +243,8 @@ public RestoreJob(JobType jobType) { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { + boolean isCleanPartitions, boolean isAtomicRestore, String storageResource, + boolean reserveStoragePolicy, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -246,21 +262,26 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.isCleanTables = isCleanTables; this.isCleanPartitions = isCleanPartitions; this.isAtomicRestore = isAtomicRestore; + this.storageResource = storageResource; + this.reserveStoragePolicy = reserveStoragePolicy; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore)); + properties.put(PROP_STORAGE_RESOURCE, String.valueOf(storageResource)); + properties.put(PROP_RESERVE_STORAGE_POLICY, String.valueOf(reserveStoragePolicy)); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { + boolean isCleanPartitions, boolean isAtomicRestore, String storageResource, + boolean reserveStoragePolicy, Env env, long repoId, BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, - repoId); + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, + storageResource, reserveStoragePolicy, env, repoId); this.backupMeta = backupMeta; } @@ -665,6 +686,32 @@ private void checkAndPrepareMeta() { } } + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource != null + ? storageResource : backupS3ResourceInfo.name); + if (resource == null) { + continue; + } + if (resource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, + "The local resource " + resource.getName() + + " with the same name but a different type of backup meta."); + return; + } + } + + for (StoragePolicy backupStoragePolicy : jobInfo.newBackupObjects.storagePolicies) { + String backupStoragePoliceName = backupStoragePolicy.getName(); + Optional localPolicy = Env.getCurrentEnv().getPolicyMgr().findPolicy(backupStoragePoliceName, + PolicyTypeEnum.STORAGE); + if (localPolicy.isPresent() && localPolicy.get().getType() != PolicyTypeEnum.STORAGE) { + status = new Status(ErrCode.COMMON_ERROR, + "The local policy " + backupStoragePoliceName + + " with the same name but a different type of backup meta."); + return; + } + } + // the new tablets -> { local tablet, schema hash, storage medium }, used in atomic restore. Map tabletBases = new HashMap<>(); @@ -721,6 +768,16 @@ private void checkAndPrepareMeta() { BackupPartitionInfo backupPartInfo = partitionEntry.getValue(); Partition localPartition = localOlapTbl.getPartition(partitionName); Partition remotePartition = remoteOlapTbl.getPartition(partitionName); + + String policyName = remoteOlapTbl.getPartitionInfo() + .getDataProperty(remotePartition.getId()).getStoragePolicy(); + if (StringUtils.isNotEmpty(policyName)) { + status = new Status(ErrCode.COMMON_ERROR, "Can't restore remote partition " + + partitionName + " in table " + remoteTbl.getName() + " with storage policy " + + policyName + " when local table " + localTbl.getName() + " exist." + + " Please drop old table and restore again."); + return; + } if (localPartition != null) { // Partition already exist. PartitionInfo localPartInfo = localOlapTbl.getPartitionInfo(); @@ -806,7 +863,8 @@ private void checkAndPrepareMeta() { // reset all ids in this table String srcDbName = jobInfo.dbName; - Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica, srcDbName); + Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, + reserveReplica, reserveStoragePolicy, srcDbName); if (!st.ok()) { status = st; return; @@ -897,6 +955,18 @@ private void checkAndPrepareMeta() { if (isAtomicRestore && !restoredPartitions.isEmpty()) { throw new RuntimeException("atomic restore is set, but the restored partitions is not empty"); } + + // check and restore resources + checkAndRestoreResources(); + if (!status.ok()) { + return; + } + // check and restore storage policies, should before createReplicas to get storage_policy_id + checkAndRestoreStoragePolicies(); + if (!status.ok()) { + return; + } + for (Pair entry : restoredPartitions) { OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); Preconditions.checkNotNull(localTbl, localTbl.getName()); @@ -952,11 +1022,6 @@ private void checkAndPrepareMeta() { db.readUnlock(); } - // check and restore resources - checkAndRestoreResources(); - if (!status.ok()) { - return; - } if (LOG.isDebugEnabled()) { LOG.debug("finished to restore resources. {}", this.jobId); } @@ -1253,7 +1318,7 @@ private void checkAndRestoreResources() { } else { try { // restore resource - resourceMgr.createResource(remoteOdbcResource, false); + resourceMgr.createResource(remoteOdbcResource); } catch (DdlException e) { status = new Status(ErrCode.COMMON_ERROR, e.getMessage()); return; @@ -1261,6 +1326,99 @@ private void checkAndRestoreResources() { restoredResources.add(remoteOdbcResource); } } + + if (!reserveStoragePolicy) { + return; + } + + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + String backupResourceName = backupS3ResourceInfo.name; + Resource localResource = resourceMgr.getResource(storageResource != null + ? storageResource : backupResourceName); + S3Resource remoteS3Resource = (S3Resource) backupMeta.getResource(backupResourceName); + + if (storageResource != null) { + if (localResource != null) { + if (localResource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, "The type of local resource " + + backupResourceName + " is not same as restored resource"); + return; + } + S3Resource localS3Resource = (S3Resource) localResource; + if (localS3Resource.getProperty(S3Properties.ENDPOINT) + .equals(remoteS3Resource.getProperty(S3Properties.ENDPOINT)) + && localS3Resource.getProperty(S3Properties.BUCKET) + .equals(remoteS3Resource.getProperty(S3Properties.BUCKET)) + && localS3Resource.getProperty(S3Properties.ROOT_PATH) + .equals(remoteS3Resource.getProperty(S3Properties.ROOT_PATH))) { + status = new Status(ErrCode.COMMON_ERROR, "local S3 resource " + + storageResource + " root path " + localS3Resource.getProperty(S3Properties.ROOT_PATH) + + " should not same as restored resource root path"); + return; + } + } else { + status = new Status(ErrCode.COMMON_ERROR, + "The local resource " + storageResource + " is not exist."); + return; + } + } else { + if (localResource != null) { + if (localResource.getType() != Resource.ResourceType.S3) { + status = new Status(ErrCode.COMMON_ERROR, "The type of local resource " + + backupResourceName + " is not same as restored resource"); + return; + } + S3Resource localS3Resource = (S3Resource) localResource; + if (localS3Resource.getSignature(BackupHandler.SIGNATURE_VERSION) + != remoteS3Resource.getSignature(BackupHandler.SIGNATURE_VERSION)) { + status = new Status(ErrCode.COMMON_ERROR, "S3 resource " + + jobInfo.getAliasByOriginNameIfSet(backupResourceName) + + " already exist but with different properties"); + return; + } + } else { + status = new Status(ErrCode.COMMON_ERROR, "Local resource " + + backupResourceName + " is not exist"); + return; + } + } + } + } + + private void checkAndRestoreStoragePolicies() { + if (!reserveStoragePolicy) { + return; + } + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy backupStoragePolicy : jobInfo.newBackupObjects.storagePolicies) { + String backupStoragePoliceName = backupStoragePolicy.getName(); + Optional localPolicy = policyMgr.findPolicy(backupStoragePoliceName, + PolicyTypeEnum.STORAGE); + // use specified storageResource + if (storageResource != null) { + backupStoragePolicy.setStorageResource(storageResource); + } + if (localPolicy.isPresent()) { + StoragePolicy localStoargePolicy = (StoragePolicy) localPolicy.get(); + // storage policy name and resource name should be same + if (localStoargePolicy.getSignature(BackupHandler.SIGNATURE_VERSION) + != backupStoragePolicy.getSignature(BackupHandler.SIGNATURE_VERSION)) { + status = new Status(ErrCode.COMMON_ERROR, "Storage policy " + + jobInfo.getAliasByOriginNameIfSet(backupStoragePoliceName) + + " already exist but with different properties"); + return; + } + + } else { + // restore storage policy + try { + policyMgr.createStoragePolicy(backupStoragePolicy); + } catch (Exception e) { + LOG.error("restore storage policy fail should not happen", e); + } + storagePolicies.add(backupStoragePolicy); + } + } } private boolean genFileMappingWhenBackupReplicasEqual(PartitionInfo localPartInfo, Partition localPartition, @@ -1326,6 +1484,11 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); + String storagePolicy = ""; + if (reserveStoragePolicy) { + storagePolicy = localTbl.getPartitionInfo() + .getDataProperty(restorePart.getId()).getStoragePolicy(); + } CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendIdWithoutException(), dbId, localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(), @@ -1338,7 +1501,8 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc localTbl.getPartitionInfo().getTabletType(restorePart.getId()), null, localTbl.getCompressionType(), - localTbl.getEnableUniqueKeyMergeOnWrite(), localTbl.getStoragePolicy(), + localTbl.getEnableUniqueKeyMergeOnWrite(), + storagePolicy, localTbl.disableAutoCompaction(), localTbl.enableSingleReplicaCompaction(), localTbl.skipWriteIndexOnLoad(), @@ -1553,6 +1717,22 @@ private void replayCheckAndPrepareMeta() { } } + // restored resource + ResourceMgr resourceMgr = Env.getCurrentEnv().getResourceMgr(); + for (Resource resource : restoredResources) { + resourceMgr.replayCreateResource(resource); + } + + // restored storage policy + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy storagePolicy : storagePolicies) { + Optional localPolicy = policyMgr.findPolicy(storagePolicy.getPolicyName(), + PolicyTypeEnum.STORAGE); + if (!localPolicy.isPresent()) { + policyMgr.replayCreate(storagePolicy); + } + } + // restored partitions for (Pair entry : restoredPartitions) { OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); @@ -1624,12 +1804,6 @@ private void replayCheckAndPrepareMeta() { } } - // restored resource - ResourceMgr resourceMgr = Env.getCurrentEnv().getResourceMgr(); - for (Resource resource : restoredResources) { - resourceMgr.replayCreateResource(resource); - } - LOG.info("replay check and prepare meta. {}", this); } @@ -2100,6 +2274,7 @@ private Status allTabletCommitted(boolean isReplay) { restoredPartitions.clear(); restoredTbls.clear(); restoredResources.clear(); + storagePolicies.clear(); // release snapshot before clearing snapshotInfos releaseSnapshots(); @@ -2367,6 +2542,13 @@ private void cancelInternal(boolean isReplay) { LOG.info("remove restored resource when cancelled: {}", resource.getName()); resourceMgr.dropResource(resource); } + + // remove restored storage policy + PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr(); + for (StoragePolicy storagePolicy : storagePolicies) { + LOG.info("remove restored storage polciy when cancelled: {}", storagePolicy.getName()); + policyMgr.replayDrop(storagePolicy); + } } if (!isReplay) { @@ -2647,6 +2829,8 @@ private void readOthers(DataInput in) throws IOException { isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); + storageResource = properties.get(PROP_STORAGE_RESOURCE); + reserveStoragePolicy = Boolean.parseBoolean(properties.get(PROP_RESERVE_STORAGE_POLICY)); } @Override @@ -2657,6 +2841,8 @@ public void gsonPostProcess() throws IOException { isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); + storageResource = properties.get(PROP_STORAGE_RESOURCE); + reserveStoragePolicy = Boolean.parseBoolean(properties.get(PROP_RESERVE_STORAGE_POLICY)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 92d9aa4e9c7f82..ed7dbe3376a912 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -769,7 +769,7 @@ public void resetVersionForRestore() { } public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restoreReplicaAlloc, - boolean reserveReplica, String srcDbName) { + boolean reserveReplica, boolean reserveStoragePolicy, String srcDbName) { // ATTN: The meta of the restore may come from different clusters, so the // original ID in the meta may conflict with the ID of the new cluster. For // example, if a newly allocated ID happens to be the same as an original ID, @@ -818,7 +818,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore boolean isSinglePartition = partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST; partitionInfo.resetPartitionIdForRestore(partitionMap, - reserveReplica ? null : restoreReplicaAlloc, isSinglePartition); + reserveReplica ? null : restoreReplicaAlloc, reserveStoragePolicy, isSinglePartition); // for each partition, reset rollup index map Map nextIndexes = Maps.newHashMap(); @@ -2011,7 +2011,7 @@ public OlapTable selectiveCopy(Collection reservedPartitions, IndexExtSt // set storage medium to HDD for backup job, because we want that the backuped table // can be able to restored to another Doris cluster without SSD disk. // But for other operation such as truncate table, keep the origin storage medium. - copied.getPartitionInfo().setDataProperty(partition.getId(), new DataProperty(TStorageMedium.HDD)); + copied.getPartitionInfo().getDataProperty(partition.getId()).setStorageMedium(TStorageMedium.HDD); } for (MaterializedIndex idx : partition.getMaterializedIndices(extState)) { idx.setState(IndexState.NORMAL); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 304a105b8cf92d..059c34bb8a03d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -416,7 +416,7 @@ public void moveFromTempToFormal(long tempPartitionId) { public void resetPartitionIdForRestore( Map partitionIdMap, - ReplicaAllocation restoreReplicaAlloc, boolean isSinglePartitioned) { + ReplicaAllocation restoreReplicaAlloc, boolean reserveStoragePolicy, boolean isSinglePartitioned) { Map origIdToDataProperty = idToDataProperty; Map origIdToReplicaAllocation = idToReplicaAllocation; Map origIdToItem = idToItem; @@ -429,7 +429,8 @@ public void resetPartitionIdForRestore( idToStoragePolicy = Maps.newHashMap(); for (Map.Entry entry : partitionIdMap.entrySet()) { - idToDataProperty.put(entry.getKey(), origIdToDataProperty.get(entry.getValue())); + idToDataProperty.put(entry.getKey(), reserveStoragePolicy ? origIdToDataProperty.get(entry.getValue()) : + new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); idToReplicaAllocation.put(entry.getKey(), restoreReplicaAlloc == null ? origIdToReplicaAllocation.get(entry.getValue()) : restoreReplicaAlloc); @@ -437,7 +438,8 @@ public void resetPartitionIdForRestore( idToItem.put(entry.getKey(), origIdToItem.get(entry.getValue())); } idToInMemory.put(entry.getKey(), origIdToInMemory.get(entry.getValue())); - idToStoragePolicy.put(entry.getKey(), origIdToStoragePolicy.get(entry.getValue())); + idToStoragePolicy.put(entry.getKey(), reserveStoragePolicy + ? origIdToStoragePolicy.get(entry.getValue()) : ""); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index e6fac7f07d81ad..e0fee915eaa935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -82,6 +82,13 @@ public void createResource(CreateResourceStmt stmt) throws DdlException { } } + public void createResource(Resource resource) throws DdlException { + if (createResource(resource, false)) { + Env.getCurrentEnv().getEditLog().logCreateResource(resource); + LOG.info("Create resource success. Resource: {}", resource.getName()); + } + } + // Return true if the resource is truly added, // otherwise, return false or throw exception. public boolean createResource(Resource resource, boolean ifNotExists) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index a40e91f47d46d5..0e4c36ba19504f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -33,11 +33,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.zip.Adler32; /** * S3 resource @@ -239,5 +241,40 @@ protected void getProcNodeData(BaseProcResult result) { } readUnlock(); } + + public int getSignature(int signatureVersion) { + Adler32 adler32 = new Adler32(); + adler32.update(signatureVersion); + final String charsetName = "UTF-8"; + + try { + // table name + adler32.update(name.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view name: {}", name); + } + // type + adler32.update(type.name().getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view type: {}", type.name()); + } + // configs + for (Map.Entry config : properties.entrySet()) { + adler32.update(config.getKey().getBytes(charsetName)); + adler32.update(config.getValue().getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. view config: {}", config); + } + } + } catch (UnsupportedEncodingException e) { + LOG.error("encoding error", e); + return -1; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("signature: {}", Math.abs((int) adler32.getValue())); + } + return Math.abs((int) adler32.getValue()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java index 01c5399d4ab974..4867a5c62f9676 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -99,6 +99,10 @@ public Policy(long id, final PolicyTypeEnum type, final String policyName) { this.version = 0; } + public String getName() { + return policyName; + } + /** * Trans stmt to Policy. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 6e8bd4f08cb2f7..772ffc5adab71d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -146,6 +146,24 @@ public void createPolicy(CreatePolicyStmt stmt) throws UserException { } } + /** + * Create policy through StoragePolicy. + **/ + public void createStoragePolicy(StoragePolicy storagePolicy) throws UserException { + Map pros = Maps.newConcurrentMap(); + if (storagePolicy.getCooldownTimestampMs() != -1) { + pros.put(StoragePolicy.COOLDOWN_DATETIME, String.valueOf(storagePolicy.getCooldownTimestampMs())); + } + if (storagePolicy.getCooldownTtl() != -1) { + pros.put(StoragePolicy.COOLDOWN_TTL, String.valueOf(storagePolicy.getCooldownTtl())); + } + pros.put(StoragePolicy.STORAGE_RESOURCE, storagePolicy.getStorageResource()); + + CreatePolicyStmt stmt = new CreatePolicyStmt(storagePolicy.getType(), true, + storagePolicy.getPolicyName(), pros); + createPolicy(stmt); + } + /** * Create policy through http api. **/ @@ -272,7 +290,7 @@ public List getCopiedPoliciesByType(PolicyTypeEnum policyType) { } } - private List getPoliciesByType(PolicyTypeEnum policyType) { + public List getPoliciesByType(PolicyTypeEnum policyType) { if (typeToPolicyMap == null) { return new ArrayList<>(); } @@ -319,6 +337,11 @@ private void unprotectedAdd(Policy policy) { } + public void replayDrop(StoragePolicy policy) { + DropPolicyLog log = new DropPolicyLog(policy.getType(), policy.getPolicyName()); + replayDrop(log); + } + public void replayDrop(DropPolicyLog log) { // for compatible if (log.getType() == PolicyTypeEnum.ROW && StringUtils.isEmpty(log.getCtlName())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java index ba2e0d5c59218e..87bfed4c2252e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java @@ -25,8 +25,12 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.base.Strings; @@ -37,18 +41,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.time.LocalDateTime; import java.time.format.DateTimeParseException; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.zip.Adler32; /** * Save policy for storage migration. **/ @Data -public class StoragePolicy extends Policy { +public class StoragePolicy extends Policy implements Writable, GsonPostProcessable { public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy"; public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional defaultPolicy) @@ -185,6 +193,20 @@ public void init(final Map props, boolean ifNotExists) throws An } } + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + /** + * Read Policy from file. + **/ + public static StoragePolicy read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, StoragePolicy.class); + } + private static Resource checkResourceIsExist(final String storageResource) throws AnalysisException { Resource resource = Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource)) @@ -391,4 +413,37 @@ public boolean removeResourceReference() { } return false; } + + public int getSignature(int signatureVersion) { + Adler32 adler32 = new Adler32(); + adler32.update(signatureVersion); + final String charsetName = "UTF-8"; + + //ignore check id, version, cooldownTimestampMs, cooldownTtl + try { + // policy name + adler32.update(policyName.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. policy name: {}", policyName); + } + // storageResource name + adler32.update(storageResource.getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. storageResource name: {}", storageResource); + } + // type + adler32.update(String.valueOf(getType()).getBytes(charsetName)); + if (LOG.isDebugEnabled()) { + LOG.debug("signature. type : {}", getType()); + } + } catch (UnsupportedEncodingException e) { + LOG.error("encoding error", e); + return -1; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("signature: {}", Math.abs((int) adler32.getValue())); + } + return Math.abs((int) adler32.getValue()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1ad8d733ddea07..0db95b79815e65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.backup.AbstractJob; +import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Snapshot; import org.apache.doris.catalog.AutoIncrementGenerator; import org.apache.doris.catalog.Column; @@ -343,6 +345,18 @@ public TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(TConfirmUnusedRe LOG.warn("tablet {} not found", info.tablet_id); return; } + + AbstractJob job = Env.getCurrentEnv().getBackupHandler().getJob(tabletMeta.getDbId()); + + if (job != null && job instanceof BackupJob) { + BackupJob backupJob = (BackupJob) job; + if (!backupJob.isDone() + && backupJob.getBackupMeta().getTable((tabletMeta.getTableId())) != null) { + LOG.warn("Backup is running on this tablet {} ", info.tablet_id); + return; + } + } + Tablet tablet; int replicaNum; try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java index ba564599029994..cb929668bf5a71 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java @@ -43,6 +43,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.persist.EditLog; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; @@ -213,7 +214,8 @@ public Status getSnapshotInfoFile(String label, String backupTimestamp, List tbls = Lists.newArrayList(); tbls.add(tbl); List resources = Lists.newArrayList(); - BackupMeta backupMeta = new BackupMeta(tbls, resources); + List storagePolicys = Lists.newArrayList(); + BackupMeta backupMeta = new BackupMeta(tbls, resources, storagePolicys); Map snapshotInfos = Maps.newHashMap(); for (Partition part : tbl.getPartitions()) { for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index dadfdb632e394d..4fd6ccb1643764 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.Tag; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -256,13 +257,14 @@ boolean await(long timeout, TimeUnit unit) { db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null, false, env, repo.getId()); List
tbls = Lists.newArrayList(); List resources = Lists.newArrayList(); + List storagePolicys = Lists.newArrayList(); tbls.add(expectedRestoreTbl); - backupMeta = new BackupMeta(tbls, resources); + backupMeta = new BackupMeta(tbls, resources, storagePolicys); } @Test diff --git a/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy b/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy new file mode 100644 index 00000000000000..bca2d05b226ed6 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_cold_data.groovy @@ -0,0 +1,1506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_cooldown", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + + + } + + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + sql "DROP TABLE ${dbName}.${tableName}" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after r0 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + + + } + + //cleanup + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop resource ${resource_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + drop resource ${resource_name2}; + """ +} + +// test restore back to old instance +suite("test_backup_cooldown_1", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown_1" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + def found = 0 + def records + def result + def row + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // 1 老表存在的情况 + // 1.1 restore 不指定。预期失败, 不支持将冷热属性的表恢复到已存在的表中。 + // 1.2 restore 指定 ("reserve_storage_policy"="true"), 预期失败, 不支持将冷热属性的表恢复到已存在的表中。 + // 1.3 restore 指定 ("reserve_storage_policy"="false"), 预期成功,且不落冷 + + // 2 删除老表 + // 2.1 restore 不指定 预期成功,且落冷 + // 2.2 restore 指定 ("reserve_storage_policy"="true")预期成功,且落冷 + // 2.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + + // 3 删除老表和policy + // 3.1 restore 不指定 预期成功,且落冷 + // 3.2 restore 指定 ("reserve_storage_policy"="true")预期成功,且落冷 + // 3.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + // 4 删除老表和resource、policy + // 4.1 restore 不指定 预期失败,resource不存在 + // 4.2 restore 指定 ("reserve_storage_policy"="true")预期失败,resource不存在 + // 4.3 restore 指定 ("reserve_storage_policy"="false")预期成功,且不落冷 + + + // 1. old table exist + // 1.1 restore normal fail + // 1.2 restore with("reserve_storage_policy"="true") fail + // 1.3 restore with("reserve_storage_policy"="false") success and don't cooldown + logger.info(" ====================================== 1.1 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + logger.info(" ====================================== 1.2 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + logger.info(" ====================================== 1.3 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // 2. drop old table + // 2.1 restore normal success and cooldown + // 2.2 restore with ("reserve_storage_policy"="true")success and cooldown + // 2.3 restore with ("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 2.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 2.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 2.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + // 3. drop old table and policy + // 3.1 restore normal success and cooldown + // 3.2 restore with("reserve_storage_policy"="true")success and cooldown + // 3.3 restore with("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 3.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 3.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + logger.info(" ====================================== 3.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check storage policy ${policy_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check resource ${resource_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.Name.equals(resource_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + // 4. drop old table and resource and policy + // 4.1 restore normal fail + // 4.2 restore with("reserve_storage_policy"="true") fail + // 4.3 restore with("reserve_storage_policy"="false")success and don't cooldown + logger.info(" ====================================== 4.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed with local restore is not exist + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("is not exist")) + + + logger.info(" ====================================== 4.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed with local restore is not exist + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("is not exist")) + + + logger.info(" ====================================== 4.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check storage policy ${policy_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // check resource ${resource_name1} not exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.Name.equals(resource_name1)) { + found = 1 + break + } + } + assertEquals(found, 0) + + // 5. alter policy and success + logger.info(" ====================================== 5.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop resource ${resource_name1}; + """ + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + sql """ + ALTER STORAGE POLICY ${policy_name2} PROPERTIES ("cooldown_ttl" = "11"); + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + // check storage policy ${policy_name2} exist + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.PolicyName.equals(policy_name2) && res2.CooldownTtl.equals("11")) { + found = 1 + break + } + } + assertEquals(found, 1) + + + + //cleanup + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + sql """ + drop storage policy ${policy_name1}; + """ + + sql """ + drop resource ${resource_name1}; + """ + + sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + drop resource ${resource_name2}; + """ +} + + + + + +// test restore back to a new instance +suite("test_backup_cooldown_2", "backup_cooldown_data") { + + String suiteName = "test_backup_cooldown_2" + String resource_name1 = "resource_${suiteName}_1" + String policy_name1 = "policy_${suiteName}_1" + String resource_name2 = "resource_${suiteName}_2" + String resource_new_name = "resource_${suiteName}_new" + String policy_name2 = "policy_${suiteName}_2" + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String snapshotName = "${suiteName}_snapshot" + String repoName = "${suiteName}_repo" + def found = 0 + def records + def syncer = getSyncer() + def result + syncer.createS3Repository(repoName) + + + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name1}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_name2}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown2", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resource_new_name}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "AWS_ROOT_PATH" = "regression/cooldown3", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "AWS_BUCKET" = "${getS3BucketName()}", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name1} + PROPERTIES( + "storage_resource" = "${resource_name1}", + "cooldown_ttl" = "10" + ) + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name2} + PROPERTIES( + "storage_resource" = "${resource_name2}", + "cooldown_ttl" = "10" + ) + """ + + //generate_cooldown_task_interval_sec default is 20 + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + + sql """ + CREATE TABLE ${dbName}.${tableName} + ( + k1 BIGINT, + v1 VARCHAR(48), + INDEX idx1 (v1) USING INVERTED PROPERTIES("parser" = "english") + ) + DUPLICATE KEY(k1) + PARTITION BY RANGE(`k1`) + ( + PARTITION p201701 VALUES [(0), (3)) ("storage_policy" = "${policy_name1}"), + PARTITION `p201702` VALUES LESS THAN (6)("storage_policy" = "${policy_name2}"), + PARTITION `p2018` VALUES [(6),(100)) + ) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + List values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + int count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // 1 老表存在的情况 + // 1.1 restore 指定 ("storage_resource"="resource_name_exist"), 预期失败,不支持将冷热属性的表恢复到已存在的表中。 + // 1.2 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 1.3 restore 指定 ("storage_resource"="resource_new_name"), 预期失败,不支持将冷热属性的表恢复到已存在的表中。 + + + // 2 删除表 + // 2.1 restore 指定 ("storage_resource"="resource_name_exist"), 预期失败,resource路径需不一致 + // 2.2 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 2.3 restore 指定 ("storage_resource"="resource_new_name"), storage policy 存在失败 + + + // 3 删除表和policy + // 3.1 restore 指定 ("storage_resource"="resource_name_not_exist"), 预期失败,resource不存在 + // 3.2 restore 指定 ("storage_resource"="resource_new_name"), 成功 + + + + // 4 删除表和policy 同时指定storage_resource和reserve_storage_policy + // 4.1 restore 指定 ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="true"), 预期失败,resource不存在 + // 4.2 restore 指定 ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="false"), 预期失败,resource不存在 + // 4.3 restore 指定 ("storage_resource"="resource_new_name", "reserve_storage_policy"="true"), 预期成功,且落冷 + // 4.4 restore 指定 ("storage_resource"="resource_new_name", "reserve_storage_policy"="false"), 预期成功,且不落冷 + + + + // 1 old table exist + // 1.1 restore with ("storage_resource"="resource_name1") fail + // 1.2 restore with ("storage_resource"="resource_name_not_exist") fail + // 1.3 restore with ("storage_resource"="resource_new_name") fail + logger.info(" ====================================== 1.1 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_name1}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + + + + logger.info(" ====================================== 1.2 ==================================== ") + def fail_restore_1 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + logger.info("fail_restore_1: ${fail_restore_1}") + + assertEquals(fail_restore_1, null) + + logger.info(" ====================================== 1.3 ==================================== ") + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Can't restore remote partition")) + + + + + // 2 drop old table + // 2.1 restore with ("storage_resource"="resource_name_exist")fail + // 2.2 restore with ("storage_resource"="resource_name_not_exist") fail + // 2.3 restore with ("storage_resource"="resource_new_name")fail + logger.info(" ====================================== 2.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_name1}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("should not same as restored resource root path")) + + + + + logger.info(" ====================================== 2.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + def fail_restore_2 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + assertEquals(fail_restore_2, null) + + + logger.info(" ====================================== 2.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + // restore failed + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("already exist but with different properties")) + + // 3 drop table and resource and policy + // 3.1 restore with ("storage_resource"="resource_name_not_exist") fail + // 3.2 restore with ("storage_resource"="resource_new_name") success and cooldown + logger.info(" ====================================== 3.1 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + def fail_restore_3 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist" + ) + """ + + assertEquals(fail_restore_3, null) + + logger.info(" ====================================== 3.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + // check plocy_name1 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // check plocy_name2 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name2)) { + found = 1 + break + } + } + assertEquals(found, 1) + + + + // 4 drop table/resource/policy, set both storage_resource and reserve_storage_policy + // 4.1 restore with ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="true") fail + // 4.2 restore with ("storage_resource"="resource_name_not_exist", "reserve_storage_policy"="false") fail + // 4.3 restore with ("storage_resource"="resource_new_name", "reserve_storage_policy"="true") success and cooldown + // 4.4 restore with ("storage_resource"="resource_new_name", "reserve_storage_policy"="false") success and don't cooldown + logger.info(" ====================================== 4.1 ==================================== ") + sql "DROP TABLE if exists ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + def fail_restore_4 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist", + "reserve_storage_policy"="true" + ) + """ + + assertEquals(fail_restore_4, null) + + + logger.info(" ====================================== 4.2 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + def fail_restore_5 = try_sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="resource_name_not_exist", + "reserve_storage_policy"="false" + ) + """ + + assertEquals(fail_restore_5, null) + + + logger.info(" ====================================== 4.3 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}", + "reserve_storage_policy"="true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + // check plocy_name1 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name1)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // check plocy_name2 storage_resource change to resource_new_name + records = sql_return_maparray "show storage policy" + found = 0 + for (def res2 : records) { + if (res2.StorageResource.equals(resource_new_name) && res2.PolicyName.equals(policy_name2)) { + found = 1 + break + } + } + assertEquals(found, 1) + + // wait cooldown + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + count = 0; + while (sqlResult.contains("0.00")) { + if (++count >= 360) { // 30min + logger.error('cooldown task is timeouted') + throw new Exception("cooldown task is timeouted after 30 mins") + } + Thread.sleep(5000) + + result = sql "show data FROM ${dbName}.${tableName}" + sqlResult = result[0][5].toString(); + } + assertNotEquals('0.000 ', result[0][5].toString()) + + + + logger.info(" ====================================== 4.4 ==================================== ") + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + try_sql """ + drop storage policy ${policy_name1}; + """ + try_sql """ + drop storage policy ${policy_name2}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "storage_resource"="${resource_new_name}", + "reserve_storage_policy"="false" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + + // check table don't have storage_policy + records = sql_return_maparray "show storage policy using" + found = 0 + for (def res2 : records) { + if (res2.Database.equals(dbName) && res2.Table.equals(tableName)) { + found = 1 + break + } + } + assertEquals(found, 0) + + + //cleanup + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + + try_sql """ + drop storage policy ${policy_name1}; + """ + + try_sql """ + drop resource ${resource_name1}; + """ + + try_sql """ + drop storage policy ${policy_name2}; + """ + + try_sql """ + drop resource ${resource_name2}; + """ +}