Skip to content

Commit

Permalink
curvefs/client: metric perf
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Dec 20, 2023
1 parent 0c24f4f commit 1c19c66
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 13 deletions.
13 changes: 7 additions & 6 deletions curvefs/src/client/metric/client_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ void AsyncContextCollectMetrics(
std::shared_ptr<S3Metric> s3Metric,
const std::shared_ptr<curve::common::GetObjectAsyncContext>& context) {
if (s3Metric.get() != nullptr) {
CollectMetrics(&s3Metric->adaptorReadS3, context->actualLen,
context->timer.u_elapsed());

CollectMetrics(&s3Metric->adaptorAsyncReadS3, context->actualLen,
butil::cpuwide_time_us() - context->start);

switch (context->type) {
case curve::common::ContextType::Disk:
CollectMetrics(&s3Metric->readFromDiskCache, context->actualLen,
context->timer.u_elapsed());
CollectMetrics(&s3Metric->asyncReadDiskCache, context->actualLen,
butil::cpuwide_time_us() - context->start);
break;
case curve::common::ContextType::S3:
CollectMetrics(&s3Metric->readFromS3, context->actualLen,
context->timer.u_elapsed());
CollectMetrics(&s3Metric->asyncReadFromS3, context->actualLen,
butil::cpuwide_time_us() - context->start);
break;
default:
break;
Expand Down
27 changes: 26 additions & 1 deletion curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ struct S3Metric {
std::string fsName;
InterfaceMetric adaptorWrite;
InterfaceMetric adaptorRead;

InterfaceMetric adaptorDequeue;
InterfaceMetric adaptorProcess;

InterfaceMetric adaptorAsyncReadS3;
InterfaceMetric asyncReadDiskCache;
InterfaceMetric asyncReadFromS3;

InterfaceMetric waitDownloading;


InterfaceMetric adaptorWriteS3;
InterfaceMetric adaptorWriteDiskCache;
InterfaceMetric adaptorReadS3;
Expand All @@ -270,11 +281,21 @@ struct S3Metric {
bvar::Status<uint32_t> readSize;
bvar::Status<uint32_t> writeSize;

bvar::Adder<uint64_t> readAllHitsMemCounts;
bvar::Adder<uint64_t> readRequestCounts;
bvar::Adder<uint64_t> s3ReadRequestCounts;

explicit S3Metric(const std::string& name = "")
: fsName(!name.empty() ? name
: prefix + curve::common::ToHexString(this)),
adaptorWrite(prefix, fsName + "_adaptor_write"),
adaptorRead(prefix, fsName + "_adaptor_read"),
adaptorDequeue(prefix, fsName + "_adaptor_dequeue"),
adaptorProcess(prefix, fsName + "_adaptor_process"),
adaptorAsyncReadS3(prefix, fsName + "_adaptor_async_read"),
asyncReadDiskCache(prefix, fsName + "_async_read_from_disk"),
asyncReadFromS3(prefix, fsName + "_async_read_from_s3"),
waitDownloading(prefix, fsName + "_wait_download"),
adaptorWriteS3(prefix, fsName + "_adaptor_write_s3"),
adaptorWriteDiskCache(prefix, fsName + "_adaptor_write_disk_cache"),
adaptorReadS3(prefix, fsName + "_adaptor_read_s3"),
Expand All @@ -286,7 +307,11 @@ struct S3Metric {
writeToKVCache(prefix, fsName + "_write_to_kv_cache"),
readFromKVCache(prefix, fsName + "_read_from_kv_cache"),
readSize(prefix, fsName + "_adaptor_read_size", 0),
writeSize(prefix, fsName + "_adaptor_write_size", 0) {}
writeSize(prefix, fsName + "_adaptor_write_size", 0) {
readAllHitsMemCounts.expose_as(prefix, "read_all_hits_mem");
readRequestCounts.expose_as(prefix, "read_request_counts");
s3ReadRequestCounts.expose_as(prefix, "s3_read_request_counts");
}
};

template <typename Tp>
Expand Down
36 changes: 32 additions & 4 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,12 @@ int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length,
ReadFromMemCache(offset, length, dataBuf, &actualReadLen,
&memCacheMissRequest);
if (memCacheMissRequest.empty()) {
if (s3ClientAdaptor_->s3Metric_) {
s3ClientAdaptor_->s3Metric_->readAllHitsMemCounts << 1;
}
return actualReadLen;
} else {
s3ClientAdaptor_->s3Metric_->readRequestCounts << memCacheMissRequest.size();
}
VLOG(6) << "memcache miss request size: " << memCacheMissRequest.size();

Expand Down Expand Up @@ -579,22 +584,34 @@ bool FileCacheManager::ReadKVRequestFromS3(const std::string &name,
}

FileCacheManager::ReadStatus
FileCacheManager::ReadKVRequest(const std::vector<S3ReadRequest> &kvRequests,
FileCacheManager::ReadKVRequest(std::vector<S3ReadRequest> &kvRequests,
char *dataBuf, uint64_t fileLen) {
absl::BlockingCounter counter(kvRequests.size());
std::once_flag cancelFlag;
std::atomic<bool> isCanceled{false};
std::atomic<int> retCode{0};

for (const auto &req : kvRequests) {
for (auto &req : kvRequests) {
req.enqueue = butil::cpuwide_time_us();
readTaskPool_->Enqueue([&]() {
auto defer = absl::MakeCleanup([&]() { counter.DecrementCount(); });
if (isCanceled) {
LOG(WARNING) << "kv request is canceled " << req.DebugString();
return;
}
req.dequeue = butil::cpuwide_time_us() - req.enqueue;
ProcessKVRequest(req, dataBuf, fileLen, cancelFlag, isCanceled,
retCode);
req.processed = butil::cpuwide_time_us() - req.enqueue;

if (s3ClientAdaptor_->s3Metric_) {
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->adaptorDequeue, req.len, req.dequeue);
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->adaptorProcess, req.len, req.processed);
s3ClientAdaptor_->s3Metric_->s3ReadRequestCounts << 1;
}

});
}

Expand All @@ -620,7 +637,10 @@ void FileCacheManager::ProcessKVRequest(const S3ReadRequest &req, char *dataBuf,
std::string prefetchName = curvefs::common::s3util::GenObjName(
req.chunkId, blockIndex, req.compaction, req.fsId, req.inodeId,
objectPrefix);

uint64_t start = butil::cpuwide_time_us();
bool waitDownloading = false;

// if obj is in downloading, wait for it.
while (true) {
{
Expand All @@ -642,6 +662,12 @@ void FileCacheManager::ProcessKVRequest(const S3ReadRequest &req, char *dataBuf,
}
}

if (waitDownloading && s3ClientAdaptor_->s3Metric_) {
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->waitDownloading, req.len, butil::cpuwide_time_us() - start);
}


// prefetch
if (s3ClientAdaptor_->HasDiskCache() && !waitDownloading &&
!IsCachedInLocal(prefetchName)) {
Expand Down Expand Up @@ -852,15 +878,17 @@ void FileCacheManager::PrefetchS3Objs(
if (fromS3) {
auto context = std::make_shared<GetObjectAsyncContext>(
name, dataCacheS3, 0, readLen,
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, true});
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, true}, ContextType::S3);
context->start = butil::cpuwide_time_us();
auto task = [this, context]() {
s3ClientAdaptor_->GetS3Client()->DownloadAsync(context);
};
s3ClientAdaptor_->PushAsyncTask(task);
} else {
auto context = std::make_shared<GetObjectAsyncContext>(
name, dataCacheS3, 0, readLen,
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, false});
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, false}, ContextType::Disk);
context->start = butil::cpuwide_time_us();
kvClientManager_->Enqueue(context);
}
}
Expand Down
6 changes: 5 additions & 1 deletion curvefs/src/client/s3/client_s3_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ using WeakDataCachePtr = std::weak_ptr<DataCache>;
using curve::common::GetObjectAsyncCallBack;
using curve::common::PutObjectAsyncCallBack;
using curve::common::S3Adapter;
using curve::common::ContextType;
using curvefs::metaserver::Inode;
using curvefs::metaserver::S3ChunkInfo;
using curvefs::metaserver::S3ChunkInfoList;
Expand Down Expand Up @@ -96,6 +97,9 @@ struct S3ReadRequest {
uint64_t fsId;
uint64_t inodeId;
uint64_t compaction;
uint64_t enqueue;
uint64_t dequeue;
uint64_t processed;

std::string DebugString() const {
std::ostringstream os;
Expand Down Expand Up @@ -426,7 +430,7 @@ class FileCacheManager {
}

// read kv request, need
ReadStatus ReadKVRequest(const std::vector<S3ReadRequest> &kvRequests,
ReadStatus ReadKVRequest(std::vector<S3ReadRequest> &kvRequests,
char *dataBuf, uint64_t fileLen);

// thread function for ReadKVRequest
Expand Down
4 changes: 3 additions & 1 deletion src/common/s3_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ struct GetObjectAsyncContext : public Aws::Client::AsyncCallerContext {
GetObjectAsyncCallBack cb;
butil::Timer timer;
ContextType type = ContextType::Unkown;
uint64_t start;

explicit GetObjectAsyncContext(
std::string key, char* buf, off_t offset, size_t len,
Expand All @@ -143,7 +144,8 @@ struct GetObjectAsyncContext : public Aws::Client::AsyncCallerContext {
len(len),
cb(std::move(cb)),
type(type),
timer(butil::Timer::STARTED) {}
timer(butil::Timer::STARTED),
start(0) {}
};

/*
Expand Down

0 comments on commit 1c19c66

Please sign in to comment.