diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 746f109ac6d7fd..a0b0a2da9c213e 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -20,6 +20,8 @@ #include #include +// clang-format off + // meta-service's bvars BvarLatencyRecorderWithTag g_bvar_ms_begin_txn("ms", "begin_txn"); BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn("ms", "precommit_txn"); @@ -71,23 +73,27 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_copy_files("ms", "get_copy_files"); BvarLatencyRecorderWithTag g_bvar_ms_filter_copy_files("ms", "filter_copy_files"); BvarLatencyRecorderWithTag g_bvar_ms_update_delete_bitmap("ms", "update_delete_bitmap"); BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", "get_delete_bitmap"); -BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", - "get_delete_bitmap_update_lock"); +BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_bitmap"); -BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock( - "ms", "remove_delete_bitmap_update_lock"); +BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", "remove_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress"); BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id"); - BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job"); BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"); BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status"); BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status"); - BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv"); +// recycler's bvars +// TODO: use mbvar for per instance, https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md +BvarStatusWithTag g_bvar_recycler_recycle_index_earlest_ts("recycler", "recycle_index_earlest_ts"); +BvarStatusWithTag g_bvar_recycler_recycle_partition_earlest_ts("recycler", "recycle_partition_earlest_ts"); +BvarStatusWithTag g_bvar_recycler_recycle_rowset_earlest_ts("recycler", "recycle_rowset_earlest_ts"); +BvarStatusWithTag g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler", "recycle_tmp_rowset_earlest_ts"); +BvarStatusWithTag g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", "recycle_expired_txn_label_earlest_ts"); + // txn_kv's bvars bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get"); bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get"); @@ -101,107 +107,65 @@ bvar::LatencyRecorder g_bvar_txn_kv_range_remove("txn_kv", "range_remove"); bvar::LatencyRecorder g_bvar_txn_kv_get_read_version("txn_kv", "get_read_version"); bvar::LatencyRecorder g_bvar_txn_kv_get_committed_version("txn_kv", "get_committed_version"); bvar::LatencyRecorder g_bvar_txn_kv_batch_get("txn_kv", "batch_get"); - bvar::Adder g_bvar_txn_kv_get_count_normalized("txn_kv", "get_count_normalized"); - bvar::Adder g_bvar_txn_kv_commit_error_counter; -bvar::Window > g_bvar_txn_kv_commit_error_counter_minute( - "txn_kv", "commit_error", &g_bvar_txn_kv_commit_error_counter, 60); - +bvar::Window > g_bvar_txn_kv_commit_error_counter_minute("txn_kv", "commit_error", &g_bvar_txn_kv_commit_error_counter, 60); bvar::Adder g_bvar_txn_kv_commit_conflict_counter; -bvar::Window > g_bvar_txn_kv_commit_conflict_counter_minute( - "txn_kv", "commit_conflict", &g_bvar_txn_kv_commit_conflict_counter, 60); +bvar::Window > g_bvar_txn_kv_commit_conflict_counter_minute("txn_kv", "commit_conflict", &g_bvar_txn_kv_commit_conflict_counter, 60); +// fdb's bvars const int64_t BVAR_FDB_INVALID_VALUE = -99999999L; bvar::Status g_bvar_fdb_client_count("fdb_client_count", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_configuration_coordinators_count( - "fdb_configuration_coordinators_count", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_coordinators_unreachable_count( - "fdb_coordinators_unreachable_count", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_average_partition_size_bytes( - "fdb_data_average_partition_size_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_moving_data_highest_priority( - "fdb_data_moving_data_highest_priority", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_moving_data_in_flight_bytes( - "fdb_data_moving_data_in_flight_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_moving_data_in_queue_bytes( - "fdb_data_moving_data_in_queue_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_moving_total_written_bytes( - "fdb_data_moving_total_written_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_partition_count("fdb_data_partition_count", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_storage_server_space_bytes( - "fdb_data_storage_server_space_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_state_min_replicas_remaining( - "fdb_data_state_min_replicas_remaining", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes", - BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_configuration_coordinators_count("fdb_configuration_coordinators_count", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_coordinators_unreachable_count("fdb_coordinators_unreachable_count", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_average_partition_size_bytes("fdb_data_average_partition_size_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_moving_data_highest_priority("fdb_data_moving_data_highest_priority", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_moving_data_in_flight_bytes("fdb_data_moving_data_in_flight_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_moving_data_in_queue_bytes("fdb_data_moving_data_in_queue_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_moving_total_written_bytes("fdb_data_moving_total_written_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_partition_count("fdb_data_partition_count", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_storage_server_space_bytes("fdb_data_storage_server_space_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_state_min_replicas_remaining("fdb_data_state_min_replicas_remaining", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes", BVAR_FDB_INVALID_VALUE); bvar::Status g_bvar_fdb_generation("fdb_generation", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_incompatible_connections("fdb_incompatible_connections", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_latency_probe_transaction_start_ns( - "fdb_latency_probe_transaction_start_ns", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns", - BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_incompatible_connections("fdb_incompatible_connections", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_latency_probe_transaction_start_ns("fdb_latency_probe_transaction_start_ns", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns", BVAR_FDB_INVALID_VALUE); bvar::Status g_bvar_fdb_machines_count("fdb_machines_count", BVAR_FDB_INVALID_VALUE); bvar::Status g_bvar_fdb_process_count("fdb_process_count", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_qos_worst_data_lag_storage_server_ns( - "fdb_qos_worst_data_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_qos_worst_durability_lag_storage_server_ns( - "fdb_qos_worst_durability_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_qos_worst_log_server_queue_bytes( - "fdb_qos_worst_log_server_queue_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_qos_worst_storage_server_queue_bytes( - "fdb_qos_worst_storage_server_queue_bytes", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz", - BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_transactions_started_hz( - "fdb_workload_transactions_started_hz", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_transactions_committed_hz( - "fdb_workload_transactions_committed_hz", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_workload_transactions_rejected_hz( - "fdb_workload_transactions_rejected_hz", BVAR_FDB_INVALID_VALUE); -bvar::Status g_bvar_fdb_client_thread_busyness_percent( - "fdb_client_thread_busyness_percent", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_qos_worst_data_lag_storage_server_ns("fdb_qos_worst_data_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_qos_worst_durability_lag_storage_server_ns("fdb_qos_worst_durability_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_qos_worst_log_server_queue_bytes("fdb_qos_worst_log_server_queue_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_qos_worst_storage_server_queue_bytes("fdb_qos_worst_storage_server_queue_bytes", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_transactions_started_hz("fdb_workload_transactions_started_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_transactions_committed_hz("fdb_workload_transactions_committed_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_workload_transactions_rejected_hz("fdb_workload_transactions_rejected_hz", BVAR_FDB_INVALID_VALUE); +bvar::Status g_bvar_fdb_client_thread_busyness_percent("fdb_client_thread_busyness_percent", BVAR_FDB_INVALID_VALUE); // checker's bvars -BvarStatusWithTag g_bvar_checker_num_scanned("checker", "num_scanned"); -BvarStatusWithTag g_bvar_checker_num_scanned_with_segment("checker", - "num_scanned_with_segment"); -BvarStatusWithTag g_bvar_checker_num_check_failed("checker", "num_check_failed"); -BvarStatusWithTag g_bvar_checker_check_cost_s("checker", "check_cost_seconds"); -BvarStatusWithTag g_bvar_checker_enqueue_cost_s("checker", "enqueue_cost_seconds"); -BvarStatusWithTag g_bvar_checker_last_success_time_ms("checker", "last_success_time_ms"); -BvarStatusWithTag g_bvar_checker_instance_volume("checker", "instance_volume"); -BvarStatusWithTag g_bvar_inverted_checker_num_scanned("checker", "num_inverted_scanned"); -BvarStatusWithTag g_bvar_inverted_checker_num_check_failed("checker", - "num_inverted_check_failed"); +BvarStatusWithTag g_bvar_checker_num_scanned("checker", "num_scanned"); +BvarStatusWithTag g_bvar_checker_num_scanned_with_segment("checker", "num_scanned_with_segment"); +BvarStatusWithTag g_bvar_checker_num_check_failed("checker", "num_check_failed"); +BvarStatusWithTag g_bvar_checker_check_cost_s("checker", "check_cost_seconds"); +BvarStatusWithTag g_bvar_checker_enqueue_cost_s("checker", "enqueue_cost_seconds"); +BvarStatusWithTag g_bvar_checker_last_success_time_ms("checker", "last_success_time_ms"); +BvarStatusWithTag g_bvar_checker_instance_volume("checker", "instance_volume"); +BvarStatusWithTag g_bvar_inverted_checker_num_scanned("checker", "num_inverted_scanned"); +BvarStatusWithTag g_bvar_inverted_checker_num_check_failed("checker", "num_inverted_check_failed"); +BvarStatusWithTag g_bvar_inverted_checker_leaked_delete_bitmaps("checker", "leaked_delete_bitmaps"); +BvarStatusWithTag g_bvar_inverted_checker_abnormal_delete_bitmaps("checker", "abnormal_delete_bitmaps"); +BvarStatusWithTag g_bvar_inverted_checker_delete_bitmaps_scanned("checker", "delete_bitmap_keys_scanned"); -BvarStatusWithTag g_bvar_inverted_checker_leaked_delete_bitmaps("checker", - "leaked_delete_bitmaps"); -BvarStatusWithTag g_bvar_inverted_checker_abnormal_delete_bitmaps( - "checker", "abnormal_delete_bitmaps"); -BvarStatusWithTag g_bvar_inverted_checker_delete_bitmaps_scanned( - "checker", "delete_bitmap_keys_scanned"); \ No newline at end of file +// clang-format on diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index d0ad2e97957ae6..93340a6c0d291f 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -26,34 +26,53 @@ #include #include #include - -class BvarLatencyRecorderWithTag { +#include + +/** + * Manage bvars that with similar names (identical prefix) + * ${module}_${name}_${tag} + * where `tag` is added automatically when calling `get` or `put` + */ +template +class BvarWithTag { public: - BvarLatencyRecorderWithTag(std::string module, std::string name) + BvarWithTag(std::string module, std::string name) : module_(std::move(module)), name_(std::move(name)) {} - void put(const std::string& tag, int64_t value) { - std::shared_ptr instance = nullptr; + template + requires std::is_integral_v + void put(const std::string& tag, ValType value) { + std::shared_ptr instance = nullptr; { std::lock_guard l(mutex_); auto it = bvar_map_.find(tag); if (it == bvar_map_.end()) { - instance = std::make_shared(module_, name_ + "_" + tag); + instance = std::make_shared(module_, name_ + "_" + tag, ValType()); bvar_map_[tag] = instance; } else { instance = it->second; } } - (*instance) << value; + // FIXME(gavin): check bvar::Adder and more + if constexpr (std::is_same_v) { + (*instance) << value; + } else if constexpr (is_status) { + instance->set_value(value); + } else { + // This branch mean to be unreachable, add an assert(false) here to + // prevent missing branch match. + // Postpone deduction of static_assert by evaluating sizeof(T) + static_assert(!sizeof(Bvar), "all types must be matched with if constexpr"); + } } - std::shared_ptr get(const std::string& tag) { - std::shared_ptr instance = nullptr; + std::shared_ptr get(const std::string& tag) { + std::shared_ptr instance = nullptr; std::lock_guard l(mutex_); auto it = bvar_map_.find(tag); if (it == bvar_map_.end()) { - instance = std::make_shared(module_, name_ + "_" + tag); + instance = std::make_shared(module_, name_ + "_" + tag); bvar_map_[tag] = instance; return instance; } @@ -69,54 +88,14 @@ class BvarLatencyRecorderWithTag { bthread::Mutex mutex_; std::string module_; std::string name_; - std::map> bvar_map_; + std::map> bvar_map_; }; -template -class BvarStatusWithTag { -public: - BvarStatusWithTag(std::string module, std::string name) - : module_(std::move(module)), name_(std::move(name)) {} - - void put(const std::string& tag, T value) { - std::shared_ptr> instance = nullptr; - { - std::lock_guard l(mutex_); - auto it = bvar_map_.find(tag); - if (it == bvar_map_.end()) { - instance = std::make_shared>(module_, name_ + "_" + tag, T()); - bvar_map_[tag] = instance; - } else { - instance = it->second; - } - } - (*instance).set_value(value); - } - - std::shared_ptr> get(const std::string& tag) { - std::shared_ptr> instance = nullptr; - std::lock_guard l(mutex_); - - auto it = bvar_map_.find(tag); - if (it == bvar_map_.end()) { - instance = std::make_shared>(module_, name_ + "_" + tag); - bvar_map_[tag] = instance; - return instance; - } - return it->second; - } - - void remove(const std::string& tag) { - std::lock_guard l(mutex_); - bvar_map_.erase(tag); - } +using BvarLatencyRecorderWithTag = BvarWithTag; -private: - bthread::Mutex mutex_; - std::string module_; - std::string name_; - std::map>> bvar_map_; -}; +template + requires std::is_integral_v +using BvarStatusWithTag = BvarWithTag, true>; // meta-service's bvars extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn; @@ -182,6 +161,13 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; +// recycler's bvars +extern BvarStatusWithTag g_bvar_recycler_recycle_index_earlest_ts; +extern BvarStatusWithTag g_bvar_recycler_recycle_partition_earlest_ts; +extern BvarStatusWithTag g_bvar_recycler_recycle_rowset_earlest_ts; +extern BvarStatusWithTag g_bvar_recycler_recycle_tmp_rowset_earlest_ts; +extern BvarStatusWithTag g_bvar_recycler_recycle_expired_txn_label_earlest_ts; + // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; extern bvar::LatencyRecorder g_bvar_txn_kv_range_get; diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 03e2b63c72a7f6..4f3c49ee98dfe8 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -66,9 +66,11 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 3h CONF_Strings(recycle_whitelist, ""); // Comma seprated list // These instances will not be recycled, only effective when whitelist is empty. CONF_Strings(recycle_blacklist, ""); // Comma seprated list +// IO worker thread pool concurrency: object list, delete CONF_mInt32(instance_recycler_worker_pool_size, "32"); CONF_Bool(enable_checker, "false"); // The parallelism for parallel recycle operation +// s3_producer_pool recycle_tablet_pool, delete single object in this pool CONF_Int32(recycle_pool_parallelism, "40"); // Currently only used for recycler test CONF_Bool(enable_inverted_check, "false"); diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 26c8c30110a1db..4967750762da48 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1183,6 +1183,7 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle << ", rowset_id=" << rowset_id << ", rowset_meta_bytes=" << rowset_meta.ByteSizeLong() << ", segment_key_bounds_bytes=" << segment_key_bounds_bytes + << ", num_segments=" << rowset_meta.num_segments() << ", rowset_meta=" << rowset_meta.ShortDebugString(); } code = cast_as(err); diff --git a/cloud/src/recycler/obj_storage_client.h b/cloud/src/recycler/obj_storage_client.h index fc0211820d1a50..b3d5cd4978e7ac 100644 --- a/cloud/src/recycler/obj_storage_client.h +++ b/cloud/src/recycler/obj_storage_client.h @@ -30,9 +30,15 @@ struct ObjectStoragePathRef { }; struct ObjectStorageResponse { - ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r), error_msg(std::move(msg)) {} + enum Code : int { + UNDEFINED = -1, + OK = 0, + NOT_FOUND = 1, + }; + + ObjectStorageResponse(int r = OK, std::string msg = "") : ret(r), error_msg(std::move(msg)) {} // clang-format off - int ret {0}; // To unify the error handle logic with BE, we'd better use the same error code as BE + int ret {OK}; // To unify the error handle logic with BE, we'd better use the same error code as BE // clang-format on std::string error_msg; }; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index ca22b28e031c91..84d755958ee29c 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -40,6 +40,7 @@ #ifdef UNIT_TEST #include "../test/mock_accessor.h" #endif +#include "common/bvars.h" #include "common/config.h" #include "common/encryption_util.h" #include "common/logging.h" @@ -576,11 +577,11 @@ int InstanceRecycler::init() { template auto task_wrapper(Func... funcs) -> std::function { return [funcs...]() { - return [](std::initializer_list numbers) { + return [](std::initializer_list ret_vals) { int i = 0; - for (int num : numbers) { - if (num != 0) { - i = num; + for (int ret : ret_vals) { + if (ret != 0) { + i = ret; } } return i; @@ -597,11 +598,15 @@ int InstanceRecycler::do_recycle() { fmt::format("instance id {}", instance_id_), [](int r) { return r != 0; }); sync_executor - .add(task_wrapper( + .add(task_wrapper( // dropped table and dropped partition need to be recycled in series + // becase they may both recycle the same set of tablets + // recycle dropped table or idexes(mv, rollup) [this]() -> int { return InstanceRecycler::recycle_indexes(); }, - [this]() -> int { return InstanceRecycler::recycle_partitions(); }, - [this]() -> int { return InstanceRecycler::recycle_tmp_rowsets(); }, - [this]() -> int { return InstanceRecycler::recycle_rowsets(); })) + // recycle dropped partitions + [this]() -> int { return InstanceRecycler::recycle_partitions(); })) + .add(task_wrapper( + [this]() -> int { return InstanceRecycler::recycle_tmp_rowsets(); })) + .add(task_wrapper([this]() -> int { return InstanceRecycler::recycle_rowsets(); })) .add(task_wrapper( [this]() { return InstanceRecycler::abort_timeout_txn(); }, [this]() { return InstanceRecycler::recycle_expired_txn_label(); })) @@ -625,6 +630,11 @@ int InstanceRecycler::do_recycle() { } } +/** + * 1. delete all remote data + * 2. delete all kv + * 3. remove instance kv + */ int InstanceRecycler::recycle_deleted_instance() { LOG_INFO("begin to recycle deleted instance").tag("instance_id", instance_id_); @@ -638,6 +648,29 @@ int InstanceRecycler::recycle_deleted_instance() { << "s, instance_id=" << instance_id_; }); + // delete all remote data + for (auto& [_, accessor] : accessor_map_) { + if (stopped()) { + return ret; + } + + LOG(INFO) << "begin to delete all objects in " << accessor->uri(); + int del_ret = accessor->delete_all(); + if (del_ret == 0) { + LOG(INFO) << "successfully delete all objects in " << accessor->uri(); + } else if (del_ret != 1) { // no need to log, because S3Accessor has logged this error + // If `del_ret == 1`, it can be considered that the object data has been recycled by cloud platform, + // so the recycling has been successful. + ret = -1; + } + } + + if (ret != 0) { + LOG(WARNING) << "failed to delete all data of deleted instance=" << instance_id_; + return ret; + } + + // delete all kv std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -681,22 +714,6 @@ int InstanceRecycler::recycle_deleted_instance() { ret = -1; } - for (auto& [_, accessor] : accessor_map_) { - if (stopped()) { - return ret; - } - - LOG(INFO) << "begin to delete all objects in " << accessor->uri(); - int del_ret = accessor->delete_all(); - if (del_ret == 0) { - LOG(INFO) << "successfully delete all objects in " << accessor->uri(); - } else if (del_ret != 1) { // no need to log, because S3Accessor has logged this error - // If `del_ret == 1`, it can be considered that the object data has been recycled by cloud platform, - // so the recycling has been successful. - ret = -1; - } - } - if (ret == 0) { // remove instance kv // ATTN: MUST ensure that cloud platform won't regenerate the same instance id @@ -721,9 +738,9 @@ int InstanceRecycler::recycle_deleted_instance() { int InstanceRecycler::recycle_indexes() { const std::string task_name = "recycle_indexes"; - int num_scanned = 0; - int num_expired = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_expired = 0; + int64_t num_recycled = 0; RecycleIndexKeyInfo index_key_info0 {instance_id_, 0}; RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX}; @@ -748,9 +765,11 @@ int InstanceRecycler::recycle_indexes() { .tag("num_recycled", num_recycled); }); - auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t { + int64_t earlest_ts = std::numeric_limits::max(); + + auto calc_expiration = [&earlest_ts, this](const RecycleIndexPB& index) { if (config::force_immediate_recycle) { - return 0; + return 0L; } int64_t expiration = index.expiration() > 0 ? index.expiration() : index.creation_time(); int64_t retention_seconds = config::retention_seconds; @@ -758,7 +777,12 @@ int InstanceRecycler::recycle_indexes() { retention_seconds = std::min(config::dropped_index_retention_seconds, retention_seconds); } - return expiration + retention_seconds; + int64_t final_expiration = expiration + retention_seconds; + if (earlest_ts > final_expiration) { + earlest_ts = final_expiration; + g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_, earlest_ts); + } + return final_expiration; }; // Elements in `index_keys` has the same lifetime as `it` in `scan_and_recycle` @@ -919,9 +943,9 @@ bool check_lazy_txn_finished(std::shared_ptr txn_kv, const std::string in int InstanceRecycler::recycle_partitions() { const std::string task_name = "recycle_partitions"; - int num_scanned = 0; - int num_expired = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_expired = 0; + int64_t num_recycled = 0; RecyclePartKeyInfo part_key_info0 {instance_id_, 0}; RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX}; @@ -946,9 +970,11 @@ int InstanceRecycler::recycle_partitions() { .tag("num_recycled", num_recycled); }); - auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t { + int64_t earlest_ts = std::numeric_limits::max(); + + auto calc_expiration = [&earlest_ts, this](const RecyclePartitionPB& partition) { if (config::force_immediate_recycle) { - return 0; + return 0L; } int64_t expiration = partition.expiration() > 0 ? partition.expiration() : partition.creation_time(); @@ -957,7 +983,12 @@ int InstanceRecycler::recycle_partitions() { retention_seconds = std::min(config::dropped_partition_retention_seconds, retention_seconds); } - return expiration + retention_seconds; + int64_t final_expiration = expiration + retention_seconds; + if (earlest_ts > final_expiration) { + earlest_ts = final_expiration; + g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_, earlest_ts); + } + return final_expiration; }; // Elements in `partition_keys` has the same lifetime as `it` in `scan_and_recycle` @@ -1074,8 +1105,8 @@ int InstanceRecycler::recycle_partitions() { } int InstanceRecycler::recycle_versions() { - int num_scanned = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_recycled = 0; LOG_INFO("begin to recycle table and partition versions").tag("instance_id", instance_id_); @@ -1152,13 +1183,14 @@ int InstanceRecycler::recycle_versions() { int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_t partition_id, bool is_empty_tablet) { - int num_scanned = 0; - std::atomic_int num_recycled = 0; + int64_t num_scanned = 0; + std::atomic_long num_recycled = 0; std::string tablet_key_begin, tablet_key_end; std::string stats_key_begin, stats_key_end; std::string job_key_begin, job_key_end; + std::string tablet_belongs; if (partition_id > 0) { // recycle tablets in a partition belonging to the index meta_tablet_key({instance_id_, table_id, index_id, partition_id, 0}, &tablet_key_begin); @@ -1167,6 +1199,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ stats_tablet_key({instance_id_, table_id, index_id, partition_id + 1, 0}, &stats_key_end); job_tablet_key({instance_id_, table_id, index_id, partition_id, 0}, &job_key_begin); job_tablet_key({instance_id_, table_id, index_id, partition_id + 1, 0}, &job_key_end); + tablet_belongs = "partition"; } else { // recycle tablets in the index meta_tablet_key({instance_id_, table_id, index_id, 0, 0}, &tablet_key_begin); @@ -1175,9 +1208,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ stats_tablet_key({instance_id_, table_id, index_id + 1, 0, 0}, &stats_key_end); job_tablet_key({instance_id_, table_id, index_id, 0, 0}, &job_key_begin); job_tablet_key({instance_id_, table_id, index_id + 1, 0, 0}, &job_key_end); + tablet_belongs = "index"; } - LOG_INFO("begin to recycle tablets") + LOG_INFO("begin to recycle tablets of the " + tablet_belongs) .tag("table_id", table_id) .tag("index_id", index_id) .tag("partition_id", partition_id); @@ -1186,7 +1220,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { auto cost = duration(steady_clock::now() - start_time).count(); - LOG_INFO("recycle tablets finished, cost={}s", cost) + LOG_INFO("recycle tablets of " + tablet_belongs + " finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("table_id", table_id) .tag("index_id", index_id) @@ -1612,12 +1646,15 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { int InstanceRecycler::recycle_rowsets() { const std::string task_name = "recycle_rowsets"; - int num_scanned = 0; - int num_expired = 0; - int num_prepare = 0; - size_t total_rowset_size = 0; + int64_t num_scanned = 0; + int64_t num_expired = 0; + int64_t num_prepare = 0; + int64_t num_compacted = 0; + int64_t num_empty_rowset = 0; + size_t total_rowset_key_size = 0; + size_t total_rowset_value_size = 0; size_t expired_rowset_size = 0; - std::atomic_int num_recycled = 0; + std::atomic_long num_recycled = 0; RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""}; RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""}; @@ -1640,8 +1677,11 @@ int InstanceRecycler::recycle_rowsets() { .tag("num_scanned", num_scanned) .tag("num_expired", num_expired) .tag("num_recycled", num_recycled) - .tag("num_prepare", num_prepare) - .tag("total_rowset_meta_size", total_rowset_size) + .tag("num_recycled.prepare", num_prepare) + .tag("num_recycled.compacted", num_compacted) + .tag("num_recycled.empty_rowset", num_empty_rowset) + .tag("total_rowset_meta_key_size_scanned", total_rowset_key_size) + .tag("total_rowset_meta_value_size_scanned", total_rowset_value_size) .tag("expired_rowset_meta_size", expired_rowset_size); }); @@ -1692,9 +1732,11 @@ int InstanceRecycler::recycle_rowsets() { return 0; }; - auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t { + int64_t earlest_ts = std::numeric_limits::max(); + + auto calc_expiration = [&earlest_ts, this](const RecycleRowsetPB& rs) { if (config::force_immediate_recycle) { - return 0; + return 0L; } // RecycleRowsetPB created by compacted or dropped rowset has no expiration time, and will be recycled when exceed retention time int64_t expiration = rs.expiration() > 0 ? rs.expiration() : rs.creation_time(); @@ -1703,12 +1745,18 @@ int InstanceRecycler::recycle_rowsets() { retention_seconds = std::min(config::compacted_rowset_retention_seconds, retention_seconds); } - return expiration + retention_seconds; + int64_t final_expiration = expiration + retention_seconds; + if (earlest_ts > final_expiration) { + earlest_ts = final_expiration; + g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_, earlest_ts); + } + return final_expiration; }; auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int { ++num_scanned; - total_rowset_size += v.size(); + total_rowset_key_size += k.size(); + total_rowset_value_size += v.size(); RecycleRowsetPB rowset; if (!rowset.ParseFromArray(v.data(), v.size())) { LOG_WARNING("malformed recycle rowset").tag("key", hex(k)); @@ -1780,9 +1828,12 @@ int InstanceRecycler::recycle_rowsets() { return -1; } } else { + num_compacted += rowset.type() == RecycleRowsetPB::COMPACT; rowset_keys.emplace_back(k); if (rowset_meta->num_segments() > 0) { // Skip empty rowset rowsets.push_back(std::move(*rowset_meta)); + } else { + ++num_empty_rowset; } } return 0; @@ -1823,8 +1874,7 @@ int InstanceRecycler::recycle_rowsets() { return ret; } -bool check_txn_abort(std::shared_ptr txn_kv, const std::string& instance_id, - int64_t txn_id) { +bool is_txn_aborted(std::shared_ptr txn_kv, const std::string& instance_id, int64_t txn_id) { std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -1883,11 +1933,12 @@ bool check_txn_abort(std::shared_ptr txn_kv, const std::string& instance_ int InstanceRecycler::recycle_tmp_rowsets() { const std::string task_name = "recycle_tmp_rowsets"; - int num_scanned = 0; - int num_expired = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_expired = 0; + int64_t num_recycled = 0; size_t expired_rowset_size = 0; - size_t total_rowset_size = 0; + size_t total_rowset_key_size = 0; + size_t total_rowset_value_size = 0; MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0}; MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0}; @@ -1910,41 +1961,54 @@ int InstanceRecycler::recycle_tmp_rowsets() { .tag("num_scanned", num_scanned) .tag("num_expired", num_expired) .tag("num_recycled", num_recycled) - .tag("total_rowset_meta_size", total_rowset_size) - .tag("expired_rowset_meta_size", expired_rowset_size); + .tag("total_rowset_meta_key_size_scanned", total_rowset_key_size) + .tag("total_rowset_meta_value_size_scanned", total_rowset_value_size) + .tag("expired_rowset_meta_size_recycled", expired_rowset_size); }); // Elements in `tmp_rowset_keys` has the same lifetime as `it` std::vector tmp_rowset_keys; std::vector tmp_rowsets; + int64_t earlest_ts = std::numeric_limits::max(); + auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB& rowset) { + // ATTN: `txn_expiration` should > 0, however we use `creation_time` + a large `retention_time` (> 1 day in production environment) + // when `txn_expiration` <= 0 in some unexpected situation (usually when there are bugs). This is usually safe, coz loading + // duration or timeout always < `retention_time` in practice. + int64_t expiration = + rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time(); + expiration = config::force_immediate_recycle ? 0 : expiration; + int64_t final_expiration = expiration + config::retention_seconds; + if (earlest_ts > final_expiration) { + earlest_ts = final_expiration; + g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_, earlest_ts); + } + return final_expiration; + }; + auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys, &tmp_rowsets, - &expired_rowset_size, &total_rowset_size, + &expired_rowset_size, &total_rowset_key_size, &total_rowset_value_size, + &calc_expiration, this](std::string_view k, std::string_view v) -> int { ++num_scanned; - total_rowset_size += v.size(); + total_rowset_key_size += k.size(); + total_rowset_value_size += v.size(); doris::RowsetMetaCloudPB rowset; if (!rowset.ParseFromArray(v.data(), v.size())) { LOG_WARNING("malformed rowset meta").tag("key", hex(k)); return -1; } - int64_t current_time = ::time(nullptr); - // ATTN: `txn_expiration` should > 0, however we use `creation_time` + a large `retention_time` (> 1 day in production environment) - // when `txn_expiration` <= 0 in some unexpected situation (usually when there are bugs). This is usually safe, coz loading - // duration or timeout always < `retention_time` in practice. - int64_t expiration = config::force_immediate_recycle ? 0 - : rowset.txn_expiration() > 0 ? rowset.txn_expiration() - : rowset.creation_time(); + int64_t expiration = calc_expiration(rowset); VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned << " num_expired=" << num_expired << " expiration=" << expiration << " txn_expiration=" << rowset.txn_expiration() << " rowset_creation_time=" << rowset.creation_time(); - if (current_time < expiration + config::retention_seconds) { - // not expired + int64_t current_time = ::time(nullptr); + if (current_time < expiration) { // not expired return 0; } - if (!check_txn_abort(txn_kv_, instance_id_, rowset.txn_id())) { + if (!is_txn_aborted(txn_kv_, instance_id_, rowset.txn_id())) { return 0; } @@ -1964,7 +2028,9 @@ int InstanceRecycler::recycle_tmp_rowsets() { << " tablet_id=" << rowset.tablet_id() << " rowset_id=" << rowset.rowset_id_v2() << " version=[" << rowset.start_version() << '-' << rowset.end_version() << "] txn_id=" << rowset.txn_id() << " rowset_meta_size=" << v.size() - << " creation_time" << rowset.creation_time(); + << " creation_time=" << rowset.creation_time() << " num_scanned=" << num_scanned + << " num_expired=" << num_expired; + tmp_rowset_keys.push_back(k); if (rowset.num_segments() > 0) { // Skip empty rowset tmp_rowsets.push_back(std::move(rowset)); @@ -1997,31 +2063,57 @@ int InstanceRecycler::scan_and_recycle( std::string begin, std::string_view end, std::function recycle_func, std::function loop_done) { + LOG(INFO) << "begin scan_and_recycle key_range=[" << hex(begin) << "," << hex(end) << ")"; int ret = 0; + int64_t cnt = 0; + int get_range_retried = 0; + std::string err; + std::unique_ptr> defer_log( + (int*)0x01, [begin, end, &err, &ret, &cnt, &get_range_retried](int*) { + LOG(INFO) << "finish scan_and_recycle key_range=[" << hex(begin) << "," << hex(end) + << ") num_scanned=" << cnt << " get_range_retried=" << get_range_retried + << " ret=" << ret << " err=" << err; + }); + std::unique_ptr it; do { - int get_ret = txn_get(txn_kv_.get(), begin, end, it); - if (get_ret != 0) { - LOG(WARNING) << "failed to get kv, key=" << begin << " ret=" << get_ret; + if (get_range_retried > 1000) { + err = "txn_get exceeds max retry, may not scan all keys"; + ret = -1; return -1; } - VLOG_DEBUG << "fetch " << it->size() << " kv"; + int get_ret = txn_get(txn_kv_.get(), begin, end, it); + if (get_ret != 0) { // txn kv may complain "Request for future version" + LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) + << ") num_scanned=" << cnt << " txn_get_ret=" << get_ret + << " get_range_retried=" << get_range_retried; + ++get_range_retried; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; // try again + } if (!it->has_next()) { - VLOG_DEBUG << "no keys in the given range, begin=" << hex(begin) << " end=" << hex(end); - break; + LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; + break; // scan finished } while (it->has_next()) { + ++cnt; // recycle corresponding resources auto [k, v] = it->next(); if (!it->has_next()) { begin = k; VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); } - if (recycle_func(k, v) != 0) ret = -1; + // if we want to continue scanning, the recycle_func should not return non-zero + if (recycle_func(k, v) != 0) { + err = "recycle_func error"; + ret = -1; + } } begin.push_back('\x00'); // Update to next smallest key for iteration - if (loop_done) { - if (loop_done() != 0) ret = -1; + // if we want to continue scanning, the recycle_func should not return non-zero + if (loop_done && loop_done() != 0) { + err = "loop_done error"; + ret = -1; } } while (it->more() && !stopped()); return ret; @@ -2029,10 +2121,10 @@ int InstanceRecycler::scan_and_recycle( int InstanceRecycler::abort_timeout_txn() { const std::string task_name = "abort_timeout_txn"; - int num_scanned = 0; - int num_timeout = 0; - int num_abort = 0; - int num_advance = 0; + int64_t num_scanned = 0; + int64_t num_timeout = 0; + int64_t num_abort = 0; + int64_t num_advance = 0; TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0}; TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX, INT64_MAX}; @@ -2169,9 +2261,9 @@ int InstanceRecycler::abort_timeout_txn() { int InstanceRecycler::recycle_expired_txn_label() { const std::string task_name = "recycle_expired_txn_label"; - int num_scanned = 0; - int num_expired = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_expired = 0; + int64_t num_recycled = 0; RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0}; RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, INT64_MAX}; @@ -2195,11 +2287,23 @@ int InstanceRecycler::recycle_expired_txn_label() { .tag("num_recycled", num_recycled); }); - int64_t current_time = + int64_t earlest_ts = std::numeric_limits::max(); + auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB& recycle_txn_pb) { + int64_t final_expiration = + recycle_txn_pb.creation_time() + config::label_keep_max_second * 1000L; + if (earlest_ts > final_expiration / 1000) { + earlest_ts = final_expiration / 1000; + g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_, earlest_ts); + } + return final_expiration; + }; + + int64_t current_time_ms = duration_cast(system_clock::now().time_since_epoch()).count(); - auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, ¤t_time, this]( - std::string_view k, std::string_view v) -> int { + auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, ¤t_time_ms, + &calc_expiration, + this](std::string_view k, std::string_view v) -> int { ++num_scanned; RecycleTxnPB recycle_txn_pb; if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) { @@ -2208,13 +2312,13 @@ int InstanceRecycler::recycle_expired_txn_label() { } if ((config::force_immediate_recycle) || (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) || - (recycle_txn_pb.creation_time() + config::label_keep_max_second * 1000L <= - current_time)) { - LOG_INFO("found recycle txn").tag("key", hex(k)); + (calc_expiration(recycle_txn_pb) <= current_time_ms)) { + VLOG_DEBUG << "found recycle txn, key=" << hex(k); num_expired++; } else { return 0; } + std::string_view k1 = k; //RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id k1.remove_prefix(1); // Remove key space @@ -2414,10 +2518,10 @@ struct BatchObjStoreAccessor { }; int InstanceRecycler::recycle_copy_jobs() { - int num_scanned = 0; - int num_finished = 0; - int num_expired = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_finished = 0; + int64_t num_expired = 0; + int64_t num_recycled = 0; // Used for INTERNAL stage's copy jobs to tag each batch for log trace uint64_t batch_count = 0; const std::string task_name = "recycle_copy_jobs"; @@ -2659,8 +2763,8 @@ int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id, } int InstanceRecycler::recycle_stage() { - int num_scanned = 0; - int num_recycled = 0; + int64_t num_scanned = 0; + int64_t num_recycled = 0; const std::string task_name = "recycle_stage"; LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_); @@ -2680,12 +2784,9 @@ int InstanceRecycler::recycle_stage() { RecycleStageKeyInfo key_info0 {instance_id_, ""}; RecycleStageKeyInfo key_info1 {instance_id_, "\xff"}; - std::string key0; - std::string key1; - recycle_stage_key(key_info0, &key0); - recycle_stage_key(key_info1, &key1); + std::string key0 = recycle_stage_key(key_info0); + std::string key1 = recycle_stage_key(key_info1); - // Elements in `tmp_rowset_keys` has the same lifetime as `it` std::vector stage_keys; auto recycle_func = [&start_time, &num_scanned, &num_recycled, &stage_keys, this]( std::string_view k, std::string_view v) -> int { @@ -2775,6 +2876,12 @@ int InstanceRecycler::recycle_expired_stage_objects() { }); int ret = 0; for (const auto& stage : instance_info_.stages()) { + std::stringstream ss; + ss << "instance_id=" << instance_id_ << ", stage_id=" << stage.stage_id() + << ", user_name=" << stage.mysql_user_name().at(0) + << ", user_id=" << stage.mysql_user_id().at(0) + << ", prefix=" << stage.obj_info().prefix(); + if (stopped()) break; if (stage.type() == StagePB::EXTERNAL) { continue; @@ -2788,7 +2895,7 @@ int InstanceRecycler::recycle_expired_stage_objects() { const auto& old_obj = instance_info_.obj_info()[idx - 1]; auto s3_conf = S3Conf::from_obj_store_info(old_obj); if (!s3_conf) { - LOG(WARNING) << "failed to init accessor"; + LOG(WARNING) << "failed to init s3_conf with obj_info=" << old_obj.DebugString(); continue; } @@ -2796,16 +2903,18 @@ int InstanceRecycler::recycle_expired_stage_objects() { std::shared_ptr accessor; int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor); if (ret1 != 0) { - LOG(WARNING) << "failed to init s3 accessor ret=" << ret1; + LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " " << ss.str(); + ret = -1; + continue; + } + + if (s3_conf->prefix.find("/stage/") == std::string::npos) { + LOG(WARNING) << "try to delete illegal prefix, which is catastrophic, " << ss.str(); ret = -1; continue; } - LOG(INFO) << "recycle expired stage objects, instance_id=" << instance_id_ - << ", stage_id=" << stage.stage_id() - << ", user_name=" << stage.mysql_user_name().at(0) - << ", user_id=" << stage.mysql_user_id().at(0) - << ", prefix=" << stage.obj_info().prefix(); + LOG(INFO) << "recycle expired stage objects, " << ss.str(); int64_t expiration_time = duration_cast(system_clock::now().time_since_epoch()).count() - config::internal_stage_objects_expire_time_second; @@ -2814,8 +2923,8 @@ int InstanceRecycler::recycle_expired_stage_objects() { } ret1 = accessor->delete_all(expiration_time); if (ret1 != 0) { - LOG(WARNING) << "failed to recycle expired stage objects, instance_id=" << instance_id_ - << ", stage_id=" << stage.stage_id() << ", ret=" << ret1; + LOG(WARNING) << "failed to recycle expired stage objects, ret=" << ret1 << " " + << ss.str(); ret = -1; continue; } diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 91a461f474faed..cf23dcacd2fdca 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -55,7 +55,9 @@ struct RecyclerThreadPoolGroup { RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default; RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default; RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default; + // used for accessor.delete_files, accessor.delete_directory std::shared_ptr s3_producer_pool; + // used for InstanceRecycler::recycle_tablet std::shared_ptr recycle_tablet_pool; std::shared_ptr group_recycle_function_pool; }; @@ -128,19 +130,26 @@ class InstanceRecycler { // returns 0 for success otherwise error int recycle_deleted_instance(); - // scan and recycle expired indexes + // scan and recycle expired indexes: + // 1. dropped table, dropped mv + // 2. half-successtable/index when create // returns 0 for success otherwise error int recycle_indexes(); - // scan and recycle expired partitions + // scan and recycle expired partitions: + // 1. dropped parttion + // 2. half-success partition when create // returns 0 for success otherwise error int recycle_partitions(); - // scan and recycle expired rowsets + // scan and recycle expired rowsets: + // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo) + // 2. compaction will change the input rowsets to recycle_rowset // returns 0 for success otherwise error int recycle_rowsets(); - // scan and recycle expired tmp rowsets + // scan and recycle expired tmp rowsets: + // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage // returns 0 for success otherwise error int recycle_tmp_rowsets(); @@ -203,12 +212,15 @@ class InstanceRecycler { int scan_and_recycle(std::string begin, std::string_view end, std::function recycle_func, std::function loop_done = nullptr); + // return 0 for success otherwise error int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb); + // return 0 for success otherwise error // NOTE: this function ONLY be called when the file paths cannot be calculated int delete_rowset_data(const std::string& resource_id, int64_t tablet_id, const std::string& rowset_id); + // return 0 for success otherwise error int delete_rowset_data(const std::vector& rowsets); diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 1aca88d2d1161d..224b36c277c532 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -282,6 +282,11 @@ int S3Accessor::init() { Aws::Client::ClientConfiguration aws_config; aws_config.endpointOverride = conf_.endpoint; aws_config.region = conf_.region; + // Aws::Http::CurlHandleContainer::AcquireCurlHandle() may be blocked if the connecitons are bottleneck + aws_config.maxConnections = std::max((long)(config::recycle_pool_parallelism + + config::instance_recycler_worker_pool_size), + (long)aws_config.maxConnections); + if (config::s3_client_http_scheme == "http") { aws_config.scheme = Aws::Http::Scheme::HTTP; } @@ -349,7 +354,12 @@ int S3Accessor::delete_files(const std::vector& paths) { int S3Accessor::delete_file(const std::string& path) { LOG_INFO("delete file").tag("uri", to_uri(path)); - return obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret; + int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(path)}).ret; + static_assert(ObjectStorageResponse::OK == 0); + if (ret == ObjectStorageResponse::OK || ret == ObjectStorageResponse::NOT_FOUND) { + return 0; + } + return ret; } int S3Accessor::put_file(const std::string& path, const std::string& content) { @@ -392,21 +402,45 @@ int S3Accessor::check_versioning() { } int GcsAccessor::delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time) { - LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); + LOG_INFO("begin delete prefix").tag("uri", to_uri(path_prefix)); int ret = 0; + int cnt = 0; + int skip = 0; + int64_t del_nonexisted = 0; + int del = 0; auto iter = obj_client_->list_objects({conf_.bucket, get_key(path_prefix)}); for (auto obj = iter->next(); obj.has_value(); obj = iter->next()) { + if (!(++cnt % 100)) { + LOG_INFO("loop delete prefix") + .tag("uri", to_uri(path_prefix)) + .tag("total_obj_cnt", cnt) + .tag("deleted", del) + .tag("del_nonexisted", del_nonexisted) + .tag("skipped", skip); + } if (expiration_time > 0 && obj->mtime_s > expiration_time) { + skip++; continue; } + del++; - // FIXME(plat1ko): Delete objects by batch - if (int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret; del_ret != 0) { + // FIXME(plat1ko): Delete objects by batch with genuine GCS client + int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret; + del_nonexisted += (del_ret == ObjectStorageResponse::NOT_FOUND); + static_assert(ObjectStorageResponse::OK == 0); + if (del_ret != ObjectStorageResponse::OK && del_ret != ObjectStorageResponse::NOT_FOUND) { ret = del_ret; } } + LOG_INFO("finish delete prefix") + .tag("uri", to_uri(path_prefix)) + .tag("total_obj_cnt", cnt) + .tag("deleted", del) + .tag("del_nonexisted", del_nonexisted) + .tag("skipped", skip); + if (!iter->is_valid()) { return -1; } diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index c8dcdad18d7115..0e548819d25ce4 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -293,9 +293,12 @@ ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) { .tag("responseCode", static_cast(outcome.GetError().GetResponseCode())) .tag("error", outcome.GetError().GetMessage()) .tag("exception", outcome.GetError().GetExceptionName()); - return -1; + if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return {ObjectStorageResponse::NOT_FOUND, outcome.GetError().GetMessage()}; + } + return {ObjectStorageResponse::UNDEFINED, outcome.GetError().GetMessage()}; } - return 0; + return {ObjectStorageResponse::OK}; } ObjectStorageResponse S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,