Skip to content

Commit

Permalink
Add memory buffer to streaming cache manager (cvmfs#3632)
Browse files Browse the repository at this point in the history
Adds a (ring) buffer for the most recently downloaded objects to the streaming cache manager. This substantially reduces the number of HTTP requests because typically from a single file or chunk multiple blocks are requested together through fuse read callbacks.

The default buffer size is 64M. Can be changed through CVMFS_STREAMING_CACHE_BUFFER_SIZE.


* add ring buffer skeleton

* add RingBuffer class

* add RingBuffer::CopySlice()

* use ring buffer in streaming cache mgr

* add performance counters to streaming cache mgr

* add streaming cache manager integration test
  • Loading branch information
jblomer authored Nov 14, 2024
1 parent decfc73 commit f18b70a
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 8 deletions.
1 change: 1 addition & 0 deletions cvmfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ if (BUILD_CVMFS OR BUILD_LIBCVMFS)
quota.cc
quota_posix.cc
resolv_conf_event_handler.cc
ring_buffer.cc
sanitizer.cc
sql.cc
sqlitemem.cc
Expand Down
115 changes: 111 additions & 4 deletions cvmfs/cache_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,33 @@
#include "network/download.h"
#include "network/sink.h"
#include "quota.h"
#include "statistics.h"
#include "util/mutex.h"
#include "util/platform.h"
#include "util/smalloc.h"


namespace {

class StreamingSink : public cvmfs::Sink {
public:
StreamingSink(void *buf, uint64_t size, uint64_t offset)
StreamingSink(void *buf, uint64_t size, uint64_t offset,
unsigned char *object)
: Sink(false /* is_owner */)
, pos_(0)
, window_buf_(buf)
, window_size_(size)
, window_offset_(offset)
, object_(object)
{ }

virtual ~StreamingSink() {}

virtual int64_t Write(const void *buf, uint64_t sz) {
if (object_) {
memcpy(object_ + pos_, buf, sz);
}

uint64_t old_pos = pos_;
pos_ += sz;

Expand Down Expand Up @@ -83,11 +91,44 @@ class StreamingSink : public cvmfs::Sink {
void *window_buf_;
uint64_t window_size_;
uint64_t window_offset_;
unsigned char *object_;
}; // class StreamingSink

static inline uint32_t hasher_any(const shash::Any &key) {
return *const_cast<uint32_t *>(
reinterpret_cast<const uint32_t *>(key.digest) + 1);
}

} // anonymous namespace


const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024;


StreamingCacheManager::Counters::Counters(perf::Statistics *statistics) {
sz_transferred_bytes = statistics->Register(
"streaming_cache_mgr.sz_transferred_bytes",
"Number of bytes downloaded by the streaming cache manager");
sz_transfer_ms = statistics->Register(
"streaming_cache_mgr.sz_transfer_ms",
"Time spent downloading data by the streaming cache manager");
n_downloads = statistics->Register(
"streaming_cache_mgr.n_downloads", "Number of objects requested remotely");
n_buffer_hits = statistics->Register(
"streaming_cache_mgr.n_buffer_hits",
"Number of requests served from the buffer");
n_buffer_evicts = statistics->Register(
"streaming_cache_mgr.n_buffer_evicts",
"Number of objects evicted from the buffer");
n_buffer_objects = statistics->Register(
"streaming_cache_mgr.n_buffer_objects", "Number of objects in the buffer");
n_buffer_obstacles = statistics->Register(
"streaming_cache_mgr.n_buffer_obstacles",
"Number of objects that could not be stored in the buffer "
"(e.g., too large)");
}


download::DownloadManager *StreamingCacheManager::SelectDownloadManager(
const FdInfo &info)
{
Expand All @@ -96,13 +137,38 @@ download::DownloadManager *StreamingCacheManager::SelectDownloadManager(
return regular_download_mgr_;
}


int64_t StreamingCacheManager::Stream(
const FdInfo &info,
void *buf,
uint64_t size,
uint64_t offset)
{
StreamingSink sink(buf, size, offset);
// Note: objects stored in the ring buffer are prepended by their hash

{
MutexLockGuard _(lock_buffer_);
RingBuffer::ObjectHandle_t handle;
if (buffered_objects_.Lookup(info.object_id, &handle)) {
perf::Inc(counters_->n_buffer_hits);
buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf);
return buffer_->GetObjectSize(handle) - sizeof(shash::Any);
}
}

unsigned char *object = NULL;
size_t nbytes_in_buffer = 0;
if ((info.label.size != CacheManager::kSizeUnknown) &&
(info.label.size + sizeof(shash::Any) <= buffer_->GetMaxObjectSize()))
{
nbytes_in_buffer = sizeof(shash::Any) + info.label.size;
object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer));
} else {
perf::Inc(counters_->n_buffer_obstacles);
}

StreamingSink sink(buf, size, offset,
object ? (object + sizeof(shash::Any)) : NULL);
std::string url;
if (info.label.IsExternal()) {
url = info.label.path;
Expand All @@ -124,12 +190,41 @@ int64_t StreamingCacheManager::Stream(
download_job.GetInterruptCuePtr());
}

SelectDownloadManager(info)->Fetch(&download_job);
{
uint64_t timestamp = platform_monotonic_time_ns();
SelectDownloadManager(info)->Fetch(&download_job);
perf::Xadd(counters_->sz_transfer_ms,
(platform_monotonic_time_ns() - timestamp) / (1000 * 1000));
}

perf::Inc(counters_->n_downloads);
perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed());

if (download_job.error_code() != download::kFailOk) {
free(object);
return -EIO;
}

if (object) {
memcpy(object, &info.object_id, sizeof(shash::Any));
MutexLockGuard _(lock_buffer_);
while (!buffer_->HasSpaceFor(nbytes_in_buffer)) {
RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack();
// As long as we don't add any new objects, the deleted_handle can still
// be accessed
shash::Any deleted_hash;
buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash);
buffered_objects_.Erase(deleted_hash);
perf::Inc(counters_->n_buffer_evicts);
perf::Dec(counters_->n_buffer_objects);
}
RingBuffer::ObjectHandle_t handle =
buffer_->PushFront(object, nbytes_in_buffer);
buffered_objects_.Insert(info.object_id, handle);
perf::Inc(counters_->n_buffer_objects);
}
free(object);

return sink.GetNBytesStreamed();
}

Expand All @@ -138,11 +233,14 @@ StreamingCacheManager::StreamingCacheManager(
unsigned max_open_fds,
CacheManager *cache_mgr,
download::DownloadManager *regular_download_mgr,
download::DownloadManager *external_download_mgr)
download::DownloadManager *external_download_mgr,
size_t buffer_size,
perf::Statistics *statistics)
: cache_mgr_(cache_mgr)
, regular_download_mgr_(regular_download_mgr)
, external_download_mgr_(external_download_mgr)
, fd_table_(max_open_fds, FdInfo())
, counters_(new Counters(statistics))
{
lock_fd_table_ =
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
Expand All @@ -151,9 +249,18 @@ StreamingCacheManager::StreamingCacheManager(

delete quota_mgr_;
quota_mgr_ = cache_mgr_->quota_mgr();

buffer_ = new RingBuffer(buffer_size);
buffered_objects_.Init(16, shash::Any(), hasher_any);
lock_buffer_ =
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
retval = pthread_mutex_init(lock_buffer_, NULL);
assert(retval == 0);
}

StreamingCacheManager::~StreamingCacheManager() {
pthread_mutex_destroy(lock_buffer_);
free(lock_buffer_);
pthread_mutex_destroy(lock_fd_table_);
free(lock_fd_table_);
quota_mgr_ = NULL; // gets deleted by cache_mgr_
Expand Down
34 changes: 33 additions & 1 deletion cvmfs/cache_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,44 @@
#include "cache.h"
#include "crypto/hash.h"
#include "fd_table.h"
#include "ring_buffer.h"
#include "smallhash.h"
#include "util/pointer.h"

namespace download {
class DownloadManager;
}
namespace perf {
class Counter;
class Statistics;
}

/**
* Cache manager that streams regular files using a download manager and stores
* file catalogs in an underlying cache manager.
*/
class StreamingCacheManager : public CacheManager {
public:
static const size_t kDefaultBufferSize;

struct Counters {
perf::Counter *sz_transferred_bytes;
perf::Counter *sz_transfer_ms;
perf::Counter *n_downloads;
perf::Counter *n_buffer_hits;
perf::Counter *n_buffer_evicts;
perf::Counter *n_buffer_objects;
perf::Counter *n_buffer_obstacles;

explicit Counters(perf::Statistics *statistics);
};

StreamingCacheManager(unsigned max_open_fds,
CacheManager *cache_mgr,
download::DownloadManager *regular_download_mgr,
download::DownloadManager *external_download_mgr);
download::DownloadManager *external_download_mgr,
size_t buffer_size,
perf::Statistics *statistics);
virtual ~StreamingCacheManager();

// In the files system / mountpoint initialization, we create the cache
Expand Down Expand Up @@ -88,6 +110,8 @@ class StreamingCacheManager : public CacheManager {
// root catalog fd, that has been already opened in the backing cache manager
int PlantFd(int fd_in_cache_mgr);

const Counters &counters() const { return *counters_; }

protected:
virtual void *DoSaveState();
virtual int DoRestoreState(void *data);
Expand Down Expand Up @@ -138,6 +162,14 @@ class StreamingCacheManager : public CacheManager {

pthread_mutex_t *lock_fd_table_;
FdTable<FdInfo> fd_table_;

/// A small in-memory cache to avoid frequent re-downloads if multiple blocks
/// from the same chunk are read
UniquePtr<RingBuffer> buffer_;
SmallHashDynamic<shash::Any, RingBuffer::ObjectHandle_t> buffered_objects_;
pthread_mutex_t *lock_buffer_;

UniquePtr<Counters> counters_;
}; // class StreamingCacheManager

#endif // CVMFS_CACHE_STREAM_H_
4 changes: 3 additions & 1 deletion cvmfs/cvmfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,9 @@ static bool RestoreState(const int fd_progress,
StreamingCacheManager(cvmfs::max_open_files_,
cvmfs::file_system_->cache_mgr(),
cvmfs::mount_point_->download_mgr(),
cvmfs::mount_point_->external_download_mgr());
cvmfs::mount_point_->external_download_mgr(),
StreamingCacheManager::kDefaultBufferSize,
cvmfs::file_system_->statistics());
fixup_root_fd = new_cache_mgr->PlantFd(old_root_fd);
cvmfs::file_system_->ReplaceCacheManager(new_cache_mgr);
cvmfs::mount_point_->fetcher()->ReplaceCacheManager(new_cache_mgr);
Expand Down
6 changes: 5 additions & 1 deletion cvmfs/mountpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,11 @@ bool FileSystem::TriageCacheMgr() {
unsigned nfiles = kDefaultNfiles;
if (options_mgr_->GetValue("CVMFS_NFILES", &optarg))
nfiles = String2Uint64(optarg);
cache_mgr_ = new StreamingCacheManager(nfiles, cache_mgr_, NULL, NULL);
size_t buffer_size = StreamingCacheManager::kDefaultBufferSize;
if (options_mgr_->GetValue("CVMFS_STREAMING_CACHE_BUFFER_SIZE", &optarg))
buffer_size = String2Uint64(optarg);
cache_mgr_ = new StreamingCacheManager(nfiles, cache_mgr_, NULL, NULL,
buffer_size, statistics_);
}

return true;
Expand Down
Loading

0 comments on commit f18b70a

Please sign in to comment.