diff --git a/cvmfs/CMakeLists.txt b/cvmfs/CMakeLists.txt index 862c646540..1861afa97a 100644 --- a/cvmfs/CMakeLists.txt +++ b/cvmfs/CMakeLists.txt @@ -87,6 +87,7 @@ if (BUILD_CVMFS OR BUILD_LIBCVMFS) quota.cc quota_posix.cc resolv_conf_event_handler.cc + ring_buffer.cc sanitizer.cc sql.cc sqlitemem.cc diff --git a/cvmfs/cache_stream.cc b/cvmfs/cache_stream.cc index a04bf8c664..f51bff8ed7 100644 --- a/cvmfs/cache_stream.cc +++ b/cvmfs/cache_stream.cc @@ -14,7 +14,9 @@ #include "network/download.h" #include "network/sink.h" #include "quota.h" +#include "statistics.h" #include "util/mutex.h" +#include "util/platform.h" #include "util/smalloc.h" @@ -22,17 +24,23 @@ namespace { class StreamingSink : public cvmfs::Sink { public: - StreamingSink(void *buf, uint64_t size, uint64_t offset) + StreamingSink(void *buf, uint64_t size, uint64_t offset, + unsigned char *object) : Sink(false /* is_owner */) , pos_(0) , window_buf_(buf) , window_size_(size) , window_offset_(offset) + , object_(object) { } virtual ~StreamingSink() {} virtual int64_t Write(const void *buf, uint64_t sz) { + if (object_) { + memcpy(object_ + pos_, buf, sz); + } + uint64_t old_pos = pos_; pos_ += sz; @@ -83,11 +91,44 @@ class StreamingSink : public cvmfs::Sink { void *window_buf_; uint64_t window_size_; uint64_t window_offset_; + unsigned char *object_; }; // class StreamingSink +static inline uint32_t hasher_any(const shash::Any &key) { + return *const_cast( + reinterpret_cast(key.digest) + 1); +} + } // anonymous namespace +const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024; + + +StreamingCacheManager::Counters::Counters(perf::Statistics *statistics) { + sz_transferred_bytes = statistics->Register( + "streaming_cache_mgr.sz_transferred_bytes", + "Number of bytes downloaded by the streaming cache manager"); + sz_transfer_ms = statistics->Register( + "streaming_cache_mgr.sz_transfer_ms", + "Time spent downloading data by the streaming cache manager"); + n_downloads = statistics->Register( + "streaming_cache_mgr.n_downloads", "Number of objects requested remotely"); + n_buffer_hits = statistics->Register( + "streaming_cache_mgr.n_buffer_hits", + "Number of requests served from the buffer"); + n_buffer_evicts = statistics->Register( + "streaming_cache_mgr.n_buffer_evicts", + "Number of objects evicted from the buffer"); + n_buffer_objects = statistics->Register( + "streaming_cache_mgr.n_buffer_objects", "Number of objects in the buffer"); + n_buffer_obstacles = statistics->Register( + "streaming_cache_mgr.n_buffer_obstacles", + "Number of objects that could not be stored in the buffer " + "(e.g., too large)"); +} + + download::DownloadManager *StreamingCacheManager::SelectDownloadManager( const FdInfo &info) { @@ -96,13 +137,38 @@ download::DownloadManager *StreamingCacheManager::SelectDownloadManager( return regular_download_mgr_; } + int64_t StreamingCacheManager::Stream( const FdInfo &info, void *buf, uint64_t size, uint64_t offset) { - StreamingSink sink(buf, size, offset); + // Note: objects stored in the ring buffer are prepended by their hash + + { + MutexLockGuard _(lock_buffer_); + RingBuffer::ObjectHandle_t handle; + if (buffered_objects_.Lookup(info.object_id, &handle)) { + perf::Inc(counters_->n_buffer_hits); + buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf); + return buffer_->GetObjectSize(handle) - sizeof(shash::Any); + } + } + + unsigned char *object = NULL; + size_t nbytes_in_buffer = 0; + if ((info.label.size != CacheManager::kSizeUnknown) && + (info.label.size + sizeof(shash::Any) <= buffer_->GetMaxObjectSize())) + { + nbytes_in_buffer = sizeof(shash::Any) + info.label.size; + object = reinterpret_cast(smalloc(nbytes_in_buffer)); + } else { + perf::Inc(counters_->n_buffer_obstacles); + } + + StreamingSink sink(buf, size, offset, + object ? (object + sizeof(shash::Any)) : NULL); std::string url; if (info.label.IsExternal()) { url = info.label.path; @@ -124,12 +190,41 @@ int64_t StreamingCacheManager::Stream( download_job.GetInterruptCuePtr()); } - SelectDownloadManager(info)->Fetch(&download_job); + { + uint64_t timestamp = platform_monotonic_time_ns(); + SelectDownloadManager(info)->Fetch(&download_job); + perf::Xadd(counters_->sz_transfer_ms, + (platform_monotonic_time_ns() - timestamp) / (1000 * 1000)); + } + + perf::Inc(counters_->n_downloads); + perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed()); if (download_job.error_code() != download::kFailOk) { + free(object); return -EIO; } + if (object) { + memcpy(object, &info.object_id, sizeof(shash::Any)); + MutexLockGuard _(lock_buffer_); + while (!buffer_->HasSpaceFor(nbytes_in_buffer)) { + RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack(); + // As long as we don't add any new objects, the deleted_handle can still + // be accessed + shash::Any deleted_hash; + buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash); + buffered_objects_.Erase(deleted_hash); + perf::Inc(counters_->n_buffer_evicts); + perf::Dec(counters_->n_buffer_objects); + } + RingBuffer::ObjectHandle_t handle = + buffer_->PushFront(object, nbytes_in_buffer); + buffered_objects_.Insert(info.object_id, handle); + perf::Inc(counters_->n_buffer_objects); + } + free(object); + return sink.GetNBytesStreamed(); } @@ -138,11 +233,14 @@ StreamingCacheManager::StreamingCacheManager( unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, - download::DownloadManager *external_download_mgr) + download::DownloadManager *external_download_mgr, + size_t buffer_size, + perf::Statistics *statistics) : cache_mgr_(cache_mgr) , regular_download_mgr_(regular_download_mgr) , external_download_mgr_(external_download_mgr) , fd_table_(max_open_fds, FdInfo()) + , counters_(new Counters(statistics)) { lock_fd_table_ = reinterpret_cast(smalloc(sizeof(pthread_mutex_t))); @@ -151,9 +249,18 @@ StreamingCacheManager::StreamingCacheManager( delete quota_mgr_; quota_mgr_ = cache_mgr_->quota_mgr(); + + buffer_ = new RingBuffer(buffer_size); + buffered_objects_.Init(16, shash::Any(), hasher_any); + lock_buffer_ = + reinterpret_cast(smalloc(sizeof(pthread_mutex_t))); + retval = pthread_mutex_init(lock_buffer_, NULL); + assert(retval == 0); } StreamingCacheManager::~StreamingCacheManager() { + pthread_mutex_destroy(lock_buffer_); + free(lock_buffer_); pthread_mutex_destroy(lock_fd_table_); free(lock_fd_table_); quota_mgr_ = NULL; // gets deleted by cache_mgr_ diff --git a/cvmfs/cache_stream.h b/cvmfs/cache_stream.h index 833d60394f..ceffe68b5d 100644 --- a/cvmfs/cache_stream.h +++ b/cvmfs/cache_stream.h @@ -12,11 +12,17 @@ #include "cache.h" #include "crypto/hash.h" #include "fd_table.h" +#include "ring_buffer.h" +#include "smallhash.h" #include "util/pointer.h" namespace download { class DownloadManager; } +namespace perf { +class Counter; +class Statistics; +} /** * Cache manager that streams regular files using a download manager and stores @@ -24,10 +30,26 @@ class DownloadManager; */ class StreamingCacheManager : public CacheManager { public: + static const size_t kDefaultBufferSize; + + struct Counters { + perf::Counter *sz_transferred_bytes; + perf::Counter *sz_transfer_ms; + perf::Counter *n_downloads; + perf::Counter *n_buffer_hits; + perf::Counter *n_buffer_evicts; + perf::Counter *n_buffer_objects; + perf::Counter *n_buffer_obstacles; + + explicit Counters(perf::Statistics *statistics); + }; + StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, - download::DownloadManager *external_download_mgr); + download::DownloadManager *external_download_mgr, + size_t buffer_size, + perf::Statistics *statistics); virtual ~StreamingCacheManager(); // In the files system / mountpoint initialization, we create the cache @@ -88,6 +110,8 @@ class StreamingCacheManager : public CacheManager { // root catalog fd, that has been already opened in the backing cache manager int PlantFd(int fd_in_cache_mgr); + const Counters &counters() const { return *counters_; } + protected: virtual void *DoSaveState(); virtual int DoRestoreState(void *data); @@ -138,6 +162,14 @@ class StreamingCacheManager : public CacheManager { pthread_mutex_t *lock_fd_table_; FdTable fd_table_; + + /// A small in-memory cache to avoid frequent re-downloads if multiple blocks + /// from the same chunk are read + UniquePtr buffer_; + SmallHashDynamic buffered_objects_; + pthread_mutex_t *lock_buffer_; + + UniquePtr counters_; }; // class StreamingCacheManager #endif // CVMFS_CACHE_STREAM_H_ diff --git a/cvmfs/cvmfs.cc b/cvmfs/cvmfs.cc index a213c4d253..ba847ccb99 100644 --- a/cvmfs/cvmfs.cc +++ b/cvmfs/cvmfs.cc @@ -2843,7 +2843,9 @@ static bool RestoreState(const int fd_progress, StreamingCacheManager(cvmfs::max_open_files_, cvmfs::file_system_->cache_mgr(), cvmfs::mount_point_->download_mgr(), - cvmfs::mount_point_->external_download_mgr()); + cvmfs::mount_point_->external_download_mgr(), + StreamingCacheManager::kDefaultBufferSize, + cvmfs::file_system_->statistics()); fixup_root_fd = new_cache_mgr->PlantFd(old_root_fd); cvmfs::file_system_->ReplaceCacheManager(new_cache_mgr); cvmfs::mount_point_->fetcher()->ReplaceCacheManager(new_cache_mgr); diff --git a/cvmfs/mountpoint.cc b/cvmfs/mountpoint.cc index f2b62cece5..af5c4921b0 100644 --- a/cvmfs/mountpoint.cc +++ b/cvmfs/mountpoint.cc @@ -1179,7 +1179,11 @@ bool FileSystem::TriageCacheMgr() { unsigned nfiles = kDefaultNfiles; if (options_mgr_->GetValue("CVMFS_NFILES", &optarg)) nfiles = String2Uint64(optarg); - cache_mgr_ = new StreamingCacheManager(nfiles, cache_mgr_, NULL, NULL); + size_t buffer_size = StreamingCacheManager::kDefaultBufferSize; + if (options_mgr_->GetValue("CVMFS_STREAMING_CACHE_BUFFER_SIZE", &optarg)) + buffer_size = String2Uint64(optarg); + cache_mgr_ = new StreamingCacheManager(nfiles, cache_mgr_, NULL, NULL, + buffer_size, statistics_); } return true; diff --git a/cvmfs/ring_buffer.cc b/cvmfs/ring_buffer.cc new file mode 100644 index 0000000000..acd1a39638 --- /dev/null +++ b/cvmfs/ring_buffer.cc @@ -0,0 +1,117 @@ +/** + * This file is part of the CernVM File System. + */ + +#include "cvmfs_config.h" +#include "ring_buffer.h" + +#include +#include +#include +#include + +#include "util/smalloc.h" + +const RingBuffer::ObjectHandle_t RingBuffer::kInvalidObjectHandle = size_t(-1); + + +RingBuffer::RingBuffer(size_t total_size) + : total_size_(total_size) + , free_space_(total_size) + , front_(0) + , back_(0) + , buffer_(reinterpret_cast(sxmmap(total_size_))) +{ + assert(total_size_ >= sizeof(size_t)); +} + + +RingBuffer::~RingBuffer() { + sxunmap(buffer_, total_size_); +} + + +void RingBuffer::Put(const void *data, size_t size) { + const size_t size_head = std::min(size, total_size_ - front_); + if (size_head > 0) + memcpy(buffer_ + front_, data, size_head); + + if (size_head < size) { + const size_t size_tail = size - size_head; + memcpy(buffer_, reinterpret_cast(data) + size_head, + size_tail); + } + + front_ = (front_ + size) % total_size_; + free_space_ -= size; +} + + +void RingBuffer::Get(size_t from, size_t size, void *to) const { + const size_t size_head = std::min(size, total_size_ - from); + if (size_head > 0) + memcpy(to, buffer_ + from, size_head); + + if (size_head < size) { + const size_t size_tail = size - size_head; + memcpy(reinterpret_cast(to) + size_head, buffer_, + size_tail); + } +} + + +void RingBuffer::Shrink(size_t by) { + back_ = (back_ + by) % total_size_; + free_space_ += by; +} + + +size_t RingBuffer::GetObjectSize(ObjectHandle_t handle) const { + size_t size_tag; + Get(handle, sizeof(size_tag), &size_tag); + assert(size_tag <= total_size_); + return size_tag; +} + + +RingBuffer::ObjectHandle_t RingBuffer::PushFront(const void *obj, size_t size) { + size_t size_tag = size; + size += sizeof(size_tag); + if (size > free_space_) { + return kInvalidObjectHandle; + } + + ObjectHandle_t result = front_; + + Put(&size_tag, sizeof(size_tag)); + Put(obj, size_tag); + + return result; +} + + +RingBuffer::ObjectHandle_t RingBuffer::RemoveBack() { + ObjectHandle_t result = back_; + + size_t size_tag = GetObjectSize(result); + Shrink(sizeof(size_tag)); + Shrink(size_tag); + + return result; +} + + +void RingBuffer::CopyObject(ObjectHandle_t handle, void *to) const +{ + size_t size_tag = GetObjectSize(handle); + ObjectHandle_t object = (handle + sizeof(size_tag)) % total_size_; + Get(object, size_tag, to); +} + + +void RingBuffer::CopySlice(ObjectHandle_t handle, size_t size, size_t offset, + void *to) const +{ + ObjectHandle_t begin = (handle + sizeof(size_t) + offset) % total_size_; + Get(begin, size, to); +} diff --git a/cvmfs/ring_buffer.h b/cvmfs/ring_buffer.h new file mode 100644 index 0000000000..ef7775dea1 --- /dev/null +++ b/cvmfs/ring_buffer.h @@ -0,0 +1,104 @@ +/** + * This file is part of the CernVM File System. + */ + +#ifndef CVMFS_RING_BUFFER_H_ +#define CVMFS_RING_BUFFER_H_ + +#include + +#include "util/single_copy.h" + +/** + * A ring buffer that allows appending objects to the front and removing + * objects from the back. In the memory area, we prepend the object data with + * the size of the object. + */ +class RingBuffer : SingleCopy { + public: + /** + * The offset in buffer_ + */ + typedef size_t ObjectHandle_t; + static const ObjectHandle_t kInvalidObjectHandle; + + explicit RingBuffer(size_t total_size); + ~RingBuffer(); + + /** + * Returns kInvalidObjectHandle if there is not enough space to insert the + * object. + */ + ObjectHandle_t PushFront(const void *obj, size_t size); + + /** + * Returns the handle of the remove object or kInvalidObjectHandle if the + * ring buffer is empty + */ + ObjectHandle_t RemoveBack(); + + /** + * The passed object handle must be valid + */ + size_t GetObjectSize(ObjectHandle_t handle) const; + + /** + * Note that we cannot just return a pointer because an object may be + * split between the end and the beginning of the backing memory area + */ + void CopyObject(ObjectHandle_t handle, void *to) const; + + /** + * Copies a sub range of the object + */ + void CopySlice(ObjectHandle_t handle, size_t size, size_t offset, + void *to) const; + + // All objects are prepended by a size tag, so we can store objects only + // up to the available space minus the size of the size tag + size_t GetMaxObjectSize() const { return total_size_ - sizeof(size_t); } + bool HasSpaceFor(size_t size) const { + return free_space_ >= size + sizeof(size_t); + } + + size_t free_space() const { return free_space_; } + + private: + /** + * Writes data into the ring buffer at front_, taking care of a potential + * buffer wrap. Assumes that the buffer has enough space. + */ + void Put(const void *data, size_t size); + /** + * Copies data from the buffer to the memory area "to". The memory area + * has to be big enough. Takes care of a potential buffer wrap. + */ + void Get(size_t from, size_t size, void *to) const; + /** + * Moves the back_ pointer forward, taking care of potential buffer wraps + */ + void Shrink(size_t by); + + /** + * Size in bytes of the memory area backing the ring buffer + */ + const size_t total_size_; + /** + * Size in bytes of the unused space in the buffer + */ + size_t free_space_; + /** + * Pointer to the front of the memory buffer, in [0, size_ - 1] + */ + size_t front_; + /** + * Pointer to the back of the memory buffer, in [0, size_ - 1] + */ + size_t back_; + /** + * The memory area backing the ring buffer + */ + unsigned char *buffer_; +}; // class RingBuffer + +#endif // CVMFS_RING_H_ diff --git a/test/src/105-streaming-cache/main b/test/src/105-streaming-cache/main new file mode 100644 index 0000000000..63c5d19c79 --- /dev/null +++ b/test/src/105-streaming-cache/main @@ -0,0 +1,48 @@ +#!/bin/bash + +cvmfs_test_name="Browsing with the streaming cache manager" +cvmfs_test_suites="quick" + +cvmfs_run_test() { + logfile=$1 + + echo "*** mount alice.cern.ch" + cvmfs_mount alice.cern.ch "CVMFS_AUTO_UPDATE=false" || return 1 + + local root_hash=$(get_xattr root_hash /cvmfs/alice.cern.ch/) + echo "*** root hash is $root_hash" + + local nfiles=100 + find /cvmfs/alice.cern.ch -type f -size +100k -size -10M | head -n $nfiles \ + > filelist || return 10 + + echo "*** Checksuming reference file set" + cat filelist | while read LINE; do + md5sum $LINE >> md5ref + done || return 20 + + sudo cvmfs_config wipecache + echo "*** Unmounting alice.cern.ch" + cvmfs_umount alice.cern.ch + echo "*** Remounting alice.cern.ch with streaming cache manager" + cvmfs_mount alice.cern.ch \ + "CVMFS_STREAMING_CACHE=yes" \ + "CVMFS_ROOT_HASH=$root_hash" || return 1 + + sudo cvmfs_talk -i alice.cern.ch cache instance + sudo cvmfs_talk -i alice.cern.ch internal affairs | grep ^streaming_cache_mgr + + echo "*** Checksuming with streaming cache mgr" + cat filelist | while read LINE; do + md5sum $LINE >> md5streaming + done || return 30 + + diff md5streaming md5ref || return 40 + + sudo cvmfs_talk -i alice.cern.ch internal affairs | grep ^streaming_cache_mgr + local nreqs=$(sudo cvmfs_talk -i alice.cern.ch internal affairs | \ + grep ^streaming_cache_mgr.n_downloads | cut -d\| -f2) + [ $nreqs -gt 1 ] || return 50 + + return 0 +} diff --git a/test/unittests/CMakeLists.txt b/test/unittests/CMakeLists.txt index f9726cd27f..ed79b5d4c0 100644 --- a/test/unittests/CMakeLists.txt +++ b/test/unittests/CMakeLists.txt @@ -363,6 +363,7 @@ set(CVMFS_UNITTEST_SOURCES t_relaxed_path_filter.cc t_s3fanout.cc t_resolv_conf_event_handler.cc + t_ring_buffer.cc t_sanitizer.cc t_session_context.cc t_session_token.cc @@ -485,6 +486,7 @@ set(CVMFS_UNITTEST_SOURCES ${CVMFS_SOURCE_DIR}/reflog_sql.cc ${CVMFS_SOURCE_DIR}/repository_tag.cc ${CVMFS_SOURCE_DIR}/resolv_conf_event_handler.cc + ${CVMFS_SOURCE_DIR}/ring_buffer.cc ${CVMFS_SOURCE_DIR}/sanitizer.cc ${CVMFS_SOURCE_DIR}/server_tool.cc ${CVMFS_SOURCE_DIR}/session_context.cc diff --git a/test/unittests/t_cache_stream.cc b/test/unittests/t_cache_stream.cc index 0c11087b56..e67863eac9 100644 --- a/test/unittests/t_cache_stream.cc +++ b/test/unittests/t_cache_stream.cc @@ -35,7 +35,8 @@ class T_StreamingCacheManager : public ::testing::Test { PosixCacheManager::Create("cache", true /* alien_cache */); backing_cache_ref_ = backing_cache_.weak_ref(); streaming_cache_ = new StreamingCacheManager( - 32, backing_cache_.Release(), download_mgr_.weak_ref(), NULL); + 32, backing_cache_.Release(), download_mgr_.weak_ref(), NULL, 1000, + statistics_.weak_ref()); EXPECT_TRUE(MkdirDeep("data", 0700)); EXPECT_TRUE(MakeCacheDirectories("data", 0700)); @@ -60,12 +61,39 @@ class T_StreamingCacheManager : public ::testing::Test { shash::Any hash_demo_; }; + TEST_F(T_StreamingCacheManager, Basics) { CacheManager::LabeledObject labeled_obj(hash_demo_); + labeled_obj.label.size = demo_.length(); + int fd = streaming_cache_->Open(labeled_obj); EXPECT_GE(fd, 0); + EXPECT_EQ(0, streaming_cache_->counters().n_downloads->Get()); EXPECT_EQ(static_cast(demo_.length()), streaming_cache_->GetSize(fd)); + EXPECT_EQ(1, streaming_cache_->counters().n_downloads->Get()); + EXPECT_EQ(1, streaming_cache_->counters().n_buffer_objects->Get()); + EXPECT_EQ(static_cast(demo_.length()), + streaming_cache_->counters().sz_transferred_bytes->Get()); + char W = 0; + EXPECT_EQ(1, streaming_cache_->Pread(fd, &W, 1, 7)); + EXPECT_EQ('W', W); + EXPECT_EQ(1, streaming_cache_->counters().n_buffer_hits->Get()); + EXPECT_EQ(0, streaming_cache_->Close(fd)); + EXPECT_EQ(-ENOENT, backing_cache_ref_->Open(labeled_obj)); +} + + +TEST_F(T_StreamingCacheManager, UnknownSize) { + CacheManager::LabeledObject labeled_obj(hash_demo_); + int fd = streaming_cache_->Open(labeled_obj); + EXPECT_GE(fd, 0); + EXPECT_EQ(static_cast(demo_.length()), + streaming_cache_->GetSize(fd)); + EXPECT_EQ(1, streaming_cache_->counters().n_downloads->Get()); + EXPECT_EQ(1, streaming_cache_->counters().n_buffer_obstacles->Get()); + EXPECT_EQ(static_cast(demo_.length()), + streaming_cache_->counters().sz_transferred_bytes->Get()); char W = 0; EXPECT_EQ(1, streaming_cache_->Pread(fd, &W, 1, 7)); EXPECT_EQ('W', W); diff --git a/test/unittests/t_ring_buffer.cc b/test/unittests/t_ring_buffer.cc new file mode 100644 index 0000000000..19e2ef234e --- /dev/null +++ b/test/unittests/t_ring_buffer.cc @@ -0,0 +1,87 @@ +/** + * This file is part of the CernVM File System. + */ + +#include + +#include + +#include "ring_buffer.h" +#include "util/pointer.h" +#include "util/prng.h" + +class T_RingBuffer : public ::testing::Test { + protected: + static const size_t kSize; + + virtual void SetUp() { + ring = new RingBuffer(kSize); + } + + virtual void TearDown() { + ring.Destroy(); + } + + UniquePtr ring; +}; + +const size_t T_RingBuffer::kSize = 997; + +TEST_F(T_RingBuffer, Basics) { + unsigned char buf[kSize]; + memset(buf, 0, kSize); + + EXPECT_EQ(RingBuffer::kInvalidObjectHandle, ring->PushFront(buf, kSize)); + EXPECT_EQ(0u, ring->PushFront(buf, kSize - sizeof(size_t))); + EXPECT_EQ(0u, ring->free_space()); + EXPECT_EQ(0u, ring->RemoveBack()); + EXPECT_EQ(kSize, ring->free_space()); + + const RingBuffer::ObjectHandle_t handle_null = ring->PushFront(NULL, 0); + EXPECT_NE(RingBuffer::kInvalidObjectHandle, handle_null); + EXPECT_EQ(0U, ring->GetObjectSize(handle_null)); + EXPECT_EQ(kSize - sizeof(size_t), ring->free_space()); +} + +TEST_F(T_RingBuffer, Wrap) { + Prng prng; + prng.InitSeed(137); + + const size_t N = kSize / 3 / sizeof(unsigned); + + unsigned buf[N]; + for (unsigned i = 0; i < N; ++i) { + buf[i] = i; + } + + RingBuffer::ObjectHandle_t objects[2]; + objects[0] = ring->PushFront(buf, kSize / 3); + buf[0] = 1; + objects[1] = ring->PushFront(buf, kSize / 3); + + for (unsigned i = 2; i < 1000; ++i) { + EXPECT_EQ(kSize / 3, ring->GetObjectSize(objects[i % 2])); + + unsigned verify[N]; + ring->CopyObject(objects[i % 2], verify); + EXPECT_EQ(2 * ((i - 2) / 2) + (i % 2), verify[0]); + for (unsigned j = 0; j < 100; ++j) { + unsigned begin = prng.Next(N); + const unsigned num = prng.Next(N - begin + 1); + ring->CopySlice(objects[i % 2], num * sizeof(unsigned), + begin * sizeof(unsigned), verify); + const size_t offset = begin; + while (begin < num) { + if (begin != 0) { + EXPECT_EQ(begin, verify[begin - offset]); + } + begin++; + } + } + + EXPECT_EQ(objects[i % 2], ring->RemoveBack()); + + buf[0] = 2 * (i / 2) + (i % 2); + objects[i % 2] = ring->PushFront(buf, kSize / 3); + } +}