Skip to content

Commit

Permalink
[opt](recycler) Improve robustness and observability (apache#45617)
Browse files Browse the repository at this point in the history
1. Fix delete non-existed object, the original impl. does not handle
correctly
2. Add recycle lag indicators: bvars for index, partition, rowset and
txn
    1. `recycle_index_earlest_ts_${instance_id}` dropped table/mv 
    2. `recycle_partition_earlest_ts_${instance_id}` dropped partitions
    3. `recycle_rowset_earlest_ts_${instance_id}` compacted rowset
4. `recycle_tmp_rowset_earlest_ts_${instance_id}` aborted transactions
tmp data
5. `recycle_expired_txn_label_earlest_ts_${instance_id}` expired labels
and transactions
4. Add retry for scan_and_recycle() to prevent KV error like "Request
future version"
6. Fix recycle delete instance may leak object data: we have to delete
data first before delete KV
7. Improve code readability: remove duplicated code and add more
comments
  • Loading branch information
gavinchou authored Dec 24, 2024
1 parent d0f09fb commit c45d468
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 281 deletions.
160 changes: 62 additions & 98 deletions cloud/src/common/bvars.cpp

Large diffs are not rendered by default.

96 changes: 41 additions & 55 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,53 @@
#include <memory>
#include <mutex>
#include <string>

class BvarLatencyRecorderWithTag {
#include <type_traits>

/**
* Manage bvars that with similar names (identical prefix)
* ${module}_${name}_${tag}
* where `tag` is added automatically when calling `get` or `put`
*/
template <typename Bvar, bool is_status = false>
class BvarWithTag {
public:
BvarLatencyRecorderWithTag(std::string module, std::string name)
BvarWithTag(std::string module, std::string name)
: module_(std::move(module)), name_(std::move(name)) {}

void put(const std::string& tag, int64_t value) {
std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
template <typename ValType>
requires std::is_integral_v<ValType>
void put(const std::string& tag, ValType value) {
std::shared_ptr<Bvar> instance = nullptr;
{
std::lock_guard<bthread::Mutex> l(mutex_);
auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
instance = std::make_shared<bvar::LatencyRecorder>(module_, name_ + "_" + tag);
instance = std::make_shared<Bvar>(module_, name_ + "_" + tag, ValType());
bvar_map_[tag] = instance;
} else {
instance = it->second;
}
}
(*instance) << value;
// FIXME(gavin): check bvar::Adder and more
if constexpr (std::is_same_v<Bvar, bvar::LatencyRecorder>) {
(*instance) << value;
} else if constexpr (is_status) {
instance->set_value(value);
} else {
// This branch mean to be unreachable, add an assert(false) here to
// prevent missing branch match.
// Postpone deduction of static_assert by evaluating sizeof(T)
static_assert(!sizeof(Bvar), "all types must be matched with if constexpr");
}
}

std::shared_ptr<bvar::LatencyRecorder> get(const std::string& tag) {
std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
std::shared_ptr<Bvar> get(const std::string& tag) {
std::shared_ptr<Bvar> instance = nullptr;
std::lock_guard<bthread::Mutex> l(mutex_);

auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
instance = std::make_shared<bvar::LatencyRecorder>(module_, name_ + "_" + tag);
instance = std::make_shared<Bvar>(module_, name_ + "_" + tag);
bvar_map_[tag] = instance;
return instance;
}
Expand All @@ -69,54 +88,14 @@ class BvarLatencyRecorderWithTag {
bthread::Mutex mutex_;
std::string module_;
std::string name_;
std::map<std::string, std::shared_ptr<bvar::LatencyRecorder>> bvar_map_;
std::map<std::string, std::shared_ptr<Bvar>> bvar_map_;
};

template <class T>
class BvarStatusWithTag {
public:
BvarStatusWithTag(std::string module, std::string name)
: module_(std::move(module)), name_(std::move(name)) {}

void put(const std::string& tag, T value) {
std::shared_ptr<bvar::Status<T>> instance = nullptr;
{
std::lock_guard<bthread::Mutex> l(mutex_);
auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
instance = std::make_shared<bvar::Status<T>>(module_, name_ + "_" + tag, T());
bvar_map_[tag] = instance;
} else {
instance = it->second;
}
}
(*instance).set_value(value);
}

std::shared_ptr<bvar::Status<T>> get(const std::string& tag) {
std::shared_ptr<bvar::Status<T>> instance = nullptr;
std::lock_guard<bthread::Mutex> l(mutex_);

auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
instance = std::make_shared<bvar::Status<T>>(module_, name_ + "_" + tag);
bvar_map_[tag] = instance;
return instance;
}
return it->second;
}

void remove(const std::string& tag) {
std::lock_guard<bthread::Mutex> l(mutex_);
bvar_map_.erase(tag);
}
using BvarLatencyRecorderWithTag = BvarWithTag<bvar::LatencyRecorder>;

private:
bthread::Mutex mutex_;
std::string module_;
std::string name_;
std::map<std::string, std::shared_ptr<bvar::Status<T>>> bvar_map_;
};
template <typename T>
requires std::is_integral_v<T>
using BvarStatusWithTag = BvarWithTag<bvar::Status<T>, true>;

// meta-service's bvars
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
Expand Down Expand Up @@ -182,6 +161,13 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;

// recycler's bvars
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_index_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_partition_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_expired_txn_label_earlest_ts;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 3h
CONF_Strings(recycle_whitelist, ""); // Comma seprated list
// These instances will not be recycled, only effective when whitelist is empty.
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
CONF_Bool(enable_checker, "false");
// The parallelism for parallel recycle operation
// s3_producer_pool recycle_tablet_pool, delete single object in this pool
CONF_Int32(recycle_pool_parallelism, "40");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,7 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
<< ", rowset_id=" << rowset_id
<< ", rowset_meta_bytes=" << rowset_meta.ByteSizeLong()
<< ", segment_key_bounds_bytes=" << segment_key_bounds_bytes
<< ", num_segments=" << rowset_meta.num_segments()
<< ", rowset_meta=" << rowset_meta.ShortDebugString();
}
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down
10 changes: 8 additions & 2 deletions cloud/src/recycler/obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ struct ObjectStoragePathRef {
};

struct ObjectStorageResponse {
ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r), error_msg(std::move(msg)) {}
enum Code : int {
UNDEFINED = -1,
OK = 0,
NOT_FOUND = 1,
};

ObjectStorageResponse(int r = OK, std::string msg = "") : ret(r), error_msg(std::move(msg)) {}
// clang-format off
int ret {0}; // To unify the error handle logic with BE, we'd better use the same error code as BE
int ret {OK}; // To unify the error handle logic with BE, we'd better use the same error code as BE
// clang-format on
std::string error_msg;
};
Expand Down
Loading

0 comments on commit c45d468

Please sign in to comment.