Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 30, 2024
1 parent f3942a8 commit 5ae1274
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h

// force recycler to recycle all useless object.
// **just for TEST**
CONF_mInt64(recycle_retention_test_seconds, "-1"); // 3h
CONF_Bool(force_immediate_recycle, "false");

CONF_String(test_s3_ak, "");
CONF_String(test_s3_sk, "");
Expand Down
55 changes: 32 additions & 23 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -743,16 +744,17 @@ int InstanceRecycler::recycle_indexes() {
.tag("num_recycled", num_recycled);
});

auto calc_expiration = [](const RecycleIndexPB& index) {
auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
int64_t expiration = index.expiration() > 0 ? index.expiration() : index.creation_time();
int64_t retention_seconds = config::retention_seconds;
if (index.state() == RecycleIndexPB::DROPPED) {
retention_seconds =
std::min(config::dropped_index_retention_seconds, retention_seconds);
}
return config::recycle_retention_test_seconds > 0
? ::time(nullptr) + config::recycle_retention_test_seconds
: expiration + retention_seconds;
return expiration + retention_seconds;
};

// Elements in `index_keys` has the same lifetime as `it` in `scan_and_recycle`
Expand Down Expand Up @@ -940,17 +942,18 @@ int InstanceRecycler::recycle_partitions() {
.tag("num_recycled", num_recycled);
});

auto calc_expiration = [](const RecyclePartitionPB& partition) {
auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
int64_t expiration =
partition.expiration() > 0 ? partition.expiration() : partition.creation_time();
int64_t retention_seconds = config::retention_seconds;
if (partition.state() == RecyclePartitionPB::DROPPED) {
retention_seconds =
std::min(config::dropped_partition_retention_seconds, retention_seconds);
}
return config::recycle_retention_test_seconds > 0
? ::time(nullptr) + config::recycle_retention_test_seconds
: expiration + retention_seconds;
return expiration + retention_seconds;
};

// Elements in `partition_keys` has the same lifetime as `it` in `scan_and_recycle`
Expand Down Expand Up @@ -1684,17 +1687,18 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};

auto calc_expiration = [](const RecycleRowsetPB& rs) {
auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
// RecycleRowsetPB created by compacted or dropped rowset has no expiration time, and will be recycled when exceed retention time
int64_t expiration = rs.expiration() > 0 ? rs.expiration() : rs.creation_time();
int64_t retention_seconds = config::retention_seconds;
if (rs.type() == RecycleRowsetPB::COMPACT || rs.type() == RecycleRowsetPB::DROP) {
retention_seconds =
std::min(config::compacted_rowset_retention_seconds, retention_seconds);
}
return config::recycle_retention_test_seconds > 0
? ::time(nullptr) + config::recycle_retention_test_seconds
: expiration + retention_seconds;
return expiration + retention_seconds;
};

auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int {
Expand Down Expand Up @@ -1923,10 +1927,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// ATTN: `txn_expiration` should > 0, however we use `creation_time` + a large `retention_time` (> 1 day in production environment)
// when `txn_expiration` <= 0 in some unexpected situation (usually when there are bugs). This is usually safe, coz loading
// duration or timeout always < `retention_time` in practice.
int64_t expiration = config::recycle_retention_test_seconds >= 0
? ::time(nullptr) + config::recycle_retention_test_seconds
: rowset.txn_expiration() > 0 ? rowset.txn_expiration()
: rowset.creation_time();
int64_t expiration = config::force_immediate_recycle ? 0
: rowset.txn_expiration() > 0 ? rowset.txn_expiration()
: rowset.creation_time();
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" << expiration
<< " txn_expiration=" << rowset.txn_expiration()
Expand Down Expand Up @@ -2108,7 +2111,7 @@ int InstanceRecycler::abort_timeout_txn() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
if (txn_running_pb.timeout_time() > current_time) {
if (!config::force_immediate_recycle && txn_running_pb.timeout_time() > current_time) {
return 0;
}
++num_timeout;
Expand Down Expand Up @@ -2198,7 +2201,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
if ((recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
if ((config::force_immediate_recycle) ||
(recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
(recycle_txn_pb.creation_time() + config::label_keep_max_second * 1000L <=
current_time)) {
LOG_INFO("found recycle txn").tag("key", hex(k));
Expand Down Expand Up @@ -2494,14 +2498,16 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (copy_job.finish_time_ms() > 0) {
if (current_time <
copy_job.finish_time_ms() + config::copy_job_max_retention_second * 1000) {
if (!config::force_immediate_recycle &&
current_time < copy_job.finish_time_ms() +
config::copy_job_max_retention_second * 1000) {
return 0;
}
} else {
// For compatibility, copy job does not contain finish time before 2.2.2, use start time
if (current_time <
copy_job.start_time_ms() + config::copy_job_max_retention_second * 1000) {
if (!config::force_immediate_recycle &&
current_time < copy_job.start_time_ms() +
config::copy_job_max_retention_second * 1000) {
return 0;
}
}
Expand All @@ -2510,7 +2516,7 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
// if copy job is timeout: delete all copy file kvs and copy job kv
if (current_time <= copy_job.timeout_time_ms()) {
if (!config::force_immediate_recycle && current_time <= copy_job.timeout_time_ms()) {
return 0;
}
++num_expired;
Expand Down Expand Up @@ -2798,6 +2804,9 @@ int InstanceRecycler::recycle_expired_stage_objects() {
int64_t expiration_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
config::internal_stage_objects_expire_time_second;
if (config::force_immediate_recycle) {
expiration_time = INT64_MAX;
}
ret1 = accessor->delete_all(expiration_time);
if (ret1 != 0) {
LOG(WARNING) << "failed to recycle expired stage objects, instance_id=" << instance_id_
Expand Down

0 comments on commit 5ae1274

Please sign in to comment.