Skip to content

Commit

Permalink
Improve performance of cache cleanup (cvmfs#3551)
Browse files Browse the repository at this point in the history
Currently, during eviction, the cache database is processed entry-by-entry with (usually) two SQL queries per entry (select + remove). This PR changes the processing to batch selection (1000 entries per batch) and a single remove call per cache cleanup. The unit tests show a drastic improvement for 50000 entries from a few 100ms to <1ms.

Fixes cvmfs#3491

* use large number of entries in quota mgr unit test

* split PosixQuotaManager::DoCleanup() in two methods

* load objects to be evicted from cache in batches

* remove entries from cache db in batches
  • Loading branch information
jblomer authored Nov 13, 2024
1 parent eccdaad commit decfc73
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 86 deletions.
184 changes: 110 additions & 74 deletions cvmfs/quota_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>

#include <limits>
#include <map>
#include <set>
#include <string>
Expand Down Expand Up @@ -150,6 +150,7 @@ void PosixQuotaManager::CloseDatabase() {
if (stmt_list_) sqlite3_finalize(stmt_list_);
if (stmt_lru_) sqlite3_finalize(stmt_lru_);
if (stmt_rm_) sqlite3_finalize(stmt_rm_);
if (stmt_rm_batch_) sqlite3_finalize(stmt_rm_batch_);
if (stmt_size_) sqlite3_finalize(stmt_size_);
if (stmt_touch_) sqlite3_finalize(stmt_touch_);
if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
Expand All @@ -164,6 +165,7 @@ void PosixQuotaManager::CloseDatabase() {
stmt_list_volatile_ = NULL;
stmt_list_ = NULL;
stmt_rm_ = NULL;
stmt_rm_batch_ = NULL;
stmt_size_ = NULL;
stmt_touch_ = NULL;
stmt_unpin_ = NULL;
Expand Down Expand Up @@ -472,96 +474,86 @@ bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
return true;

// TODO(jblomer) transaction
LogCvmfs(kLogQuota, kLogSyslog,
LogCvmfs(kLogQuota, kLogSyslog | kLogDebug,
"clean up cache until at most %lu KB is used", leave_size/1024);
LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
cleanup_recorder_.Tick();

bool result;
string hash_str;
vector<string> trash;

// Note that volatile files start counting from the smallest int64 number:
// the absolute sequence number with the first bit set in two's complement.
// So -1 can be a marker that will never appear in the database.
int64_t max_acseq = -1;
do {
sqlite3_reset(stmt_lru_);
if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry");
sqlite3_bind_int64(stmt_lru_, 1, (max_acseq == -1) ?
std::numeric_limits<int64_t>::min() : (max_acseq + 1));

std::vector<EvictCandidate> candidates;
candidates.reserve(kEvictBatchSize);
string hash_str;
unsigned i = 0;
while (sqlite3_step(stmt_lru_) == SQLITE_ROW) {
hash_str = reinterpret_cast<const char *>(
sqlite3_column_text(stmt_lru_, 0));
LogCvmfs(kLogQuota, kLogDebug, "add %s to candidates for eviction",
hash_str.c_str());
candidates.push_back(EvictCandidate(
shash::MkFromHexPtr(shash::HexPtr(hash_str)),
sqlite3_column_int64(stmt_lru_, 1),
sqlite3_column_int64(stmt_lru_, 2)));
i++;
}
if (candidates.empty()) {
LogCvmfs(kLogQuota, kLogDebug, "no more entries to evict");
break;
}

hash_str = string(reinterpret_cast<const char *>(
sqlite3_column_text(stmt_lru_, 0)));
LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str());
shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str));
const unsigned N = candidates.size();
for (i = 0; i < N; ++i) {
// That's a critical condition. We must not delete a not yet inserted
// pinned file as it is already reserved (but will be inserted later).
// Instead, set the pin bit in the db to not run into an endless loop
if (pinned_chunks_.find(candidates[i].hash) != pinned_chunks_.end()) {
hash_str = candidates[i].hash.ToString();
LogCvmfs(kLogQuota, kLogDebug, "skip %s for eviction",
hash_str.c_str());
sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
SQLITE_STATIC);
result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
sqlite3_reset(stmt_block_);
assert(result);
continue;
}

// That's a critical condition. We must not delete a not yet inserted
// pinned file as it is already reserved (but will be inserted later).
// Instead, set the pin bit in the db to not run into an endless loop
if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix());
gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
trash.push_back(cache_dir_ + "/" +
candidates[i].hash.MakePathWithoutSuffix());
gauge_ -= candidates[i].size;
max_acseq = candidates[i].acseq;
LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
hash_str.c_str(), gauge_);

sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
SQLITE_STATIC);
result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
sqlite3_reset(stmt_rm_);
candidates[i].hash.ToString().c_str(), gauge_);

if (!result) {
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
"failed to find %s in cache database (%d). "
"Cache database is out of sync. "
"Restart cvmfs with clean cache.", hash_str.c_str(), result);
return false;
}
} else {
sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
SQLITE_STATIC);
result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
sqlite3_reset(stmt_block_);
assert(result);
if (gauge_ <= leave_size)
break;
}
} while (gauge_ > leave_size);

result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
sqlite3_reset(stmt_unblock_);
assert(result);

// Double fork avoids zombie, forked removal process must not flush file
// buffers
if (!trash.empty()) {
if (async_delete_) {
pid_t pid;
int statloc;
if ((pid = fork()) == 0) {
// TODO(jblomer): eviciting files in the cache should perhaps become a
// thread. This would also allow to block the chunks and prevent the
// race with re-insertion. Then again, a thread can block umount.
#ifndef DEBUGMSG
CloseAllFildes(std::set<int>());
#endif
if (fork() == 0) {
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
unlink(trash[i].c_str());
}
_exit(0);
}
_exit(0);
} else {
if (pid > 0)
waitpid(pid, &statloc, 0);
else
return false;
}
} else { // !async_delete_
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
unlink(trash[i].c_str());
}
}
if (max_acseq != -1) {
sqlite3_bind_int64(stmt_rm_batch_, 1, max_acseq);
result = (sqlite3_step(stmt_rm_batch_) == SQLITE_DONE);
assert(result);
sqlite3_reset(stmt_rm_batch_);

result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
sqlite3_reset(stmt_unblock_);
assert(result);
}

if (!EmptyTrash(trash))
return false;

if (gauge_ > leave_size) {
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
"request to clean until %" PRIu64 ", "
Expand All @@ -571,6 +563,45 @@ bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
return true;
}

bool PosixQuotaManager::EmptyTrash(const std::vector<std::string> &trash) {
if (trash.empty())
return true;

if (async_delete_) {
// Double fork avoids zombie, forked removal process must not flush file
// buffers
pid_t pid;
int statloc;
if ((pid = fork()) == 0) {
// TODO(jblomer): eviciting files in the cache should perhaps become a
// thread. This would also allow to block the chunks and prevent the
// race with re-insertion. Then again, a thread can block umount.
#ifndef DEBUGMSG
CloseAllFildes(std::set<int>());
#endif
if (fork() == 0) {
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
unlink(trash[i].c_str());
}
_exit(0);
}
_exit(0);
} else {
if (pid > 0)
waitpid(pid, &statloc, 0);
else
return false;
}
} else { // !async_delete_
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
unlink(trash[i].c_str());
}
}
return true;
}


void PosixQuotaManager::DoInsert(
const shash::Any &hash,
Expand Down Expand Up @@ -931,9 +962,13 @@ bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
-1, &stmt_rm_, NULL);
sqlite3_prepare_v2(database_,
"SELECT sha1, size FROM cache_catalog WHERE "
"acseq=(SELECT min(acseq) "
"FROM cache_catalog WHERE pinned<>2);",
"DELETE FROM cache_catalog WHERE acseq<=:a AND pinned<>2;",
-1, &stmt_rm_batch_, NULL);
sqlite3_prepare_v2(database_, (std::string(
"SELECT sha1, size, acseq FROM cache_catalog "
"WHERE pinned<>2 AND acseq>=:a "
"ORDER BY acseq ASC "
"LIMIT ") + StringifyInt(kEvictBatchSize) + ";").c_str(),
-1, &stmt_lru_, NULL);
sqlite3_prepare_v2(database_,
("SELECT path FROM cache_catalog WHERE type=" +
Expand Down Expand Up @@ -1628,6 +1663,7 @@ PosixQuotaManager::PosixQuotaManager(
, stmt_lru_(NULL)
, stmt_size_(NULL)
, stmt_rm_(NULL)
, stmt_rm_batch_(NULL)
, stmt_list_(NULL)
, stmt_list_pinned_(NULL)
, stmt_list_catalogs_(NULL)
Expand Down
19 changes: 18 additions & 1 deletion cvmfs/quota_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,19 @@ class PosixQuotaManager : public QuotaManager {
};

/**
* Magic number to make reading PIDs from lockfiles more robust and versionable
* Used for batch queries in DoCleanup()
*/
struct EvictCandidate {
uint64_t size;
uint64_t acseq;
shash::Any hash;
EvictCandidate(const shash::Any &h, uint64_t s, uint64_t a)
: size(s), acseq(a), hash(h) {}
};

/**
* Magic number to make reading PIDs from lockfiles more robust and versionable
*/
static const unsigned kLockFileMagicNumber = 142857;

/**
Expand All @@ -196,6 +206,11 @@ class PosixQuotaManager : public QuotaManager {
*/
static const unsigned kCommandBufferSize = 32;

/**
* Batch size for database operations during DoCleanup()
*/
static const unsigned kEvictBatchSize = 1000;

/**
* Make sure that the amount of data transferred through the RPC pipe is
* within the OS's guarantees for atomicity.
Expand All @@ -220,6 +235,7 @@ class PosixQuotaManager : public QuotaManager {
void CloseDatabase();
bool Contains(const std::string &hash_str);
bool DoCleanup(const uint64_t leave_size);
bool EmptyTrash(const std::vector<std::string> &trash);

void MakeReturnPipe(int pipe[2]);
int BindReturnPipe(int pipe_wronly);
Expand Down Expand Up @@ -346,6 +362,7 @@ class PosixQuotaManager : public QuotaManager {
sqlite3_stmt *stmt_lru_;
sqlite3_stmt *stmt_size_;
sqlite3_stmt *stmt_rm_;
sqlite3_stmt *stmt_rm_batch_;
sqlite3_stmt *stmt_list_;
sqlite3_stmt *stmt_list_pinned_; /**< Loaded catalogs are pinned. */
sqlite3_stmt *stmt_list_catalogs_;
Expand Down
Loading

0 comments on commit decfc73

Please sign in to comment.