Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/client: perf opt for metric #2973

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions curvefs/src/client/metric/client_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ void AsyncContextCollectMetrics(
std::shared_ptr<S3Metric> s3Metric,
const std::shared_ptr<curve::common::GetObjectAsyncContext>& context) {
if (s3Metric.get() != nullptr) {
CollectMetrics(&s3Metric->adaptorReadS3, context->actualLen,
context->timer.u_elapsed());

CollectMetrics(&s3Metric->adaptorAsyncReadS3, context->actualLen,
butil::cpuwide_time_us() - context->start);

switch (context->type) {
case curve::common::ContextType::Disk:
CollectMetrics(&s3Metric->readFromDiskCache, context->actualLen,
context->timer.u_elapsed());
CollectMetrics(&s3Metric->asyncReadDiskCache, context->actualLen,
butil::cpuwide_time_us() - context->start);
break;
case curve::common::ContextType::S3:
CollectMetrics(&s3Metric->readFromS3, context->actualLen,
context->timer.u_elapsed());
CollectMetrics(&s3Metric->asyncReadFromS3, context->actualLen,
butil::cpuwide_time_us() - context->start);
break;
default:
break;
Expand Down
27 changes: 26 additions & 1 deletion curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ struct S3Metric {
std::string fsName;
InterfaceMetric adaptorWrite;
InterfaceMetric adaptorRead;

InterfaceMetric adaptorDequeue;
InterfaceMetric adaptorProcess;

InterfaceMetric adaptorAsyncReadS3;
InterfaceMetric asyncReadDiskCache;
InterfaceMetric asyncReadFromS3;

InterfaceMetric waitDownloading;


InterfaceMetric adaptorWriteS3;
InterfaceMetric adaptorWriteDiskCache;
InterfaceMetric adaptorReadS3;
Expand All @@ -270,11 +281,21 @@ struct S3Metric {
bvar::Status<uint32_t> readSize;
bvar::Status<uint32_t> writeSize;

bvar::Adder<uint64_t> readAllHitsMemCounts;
bvar::Adder<uint64_t> readRequestCounts;
bvar::Adder<uint64_t> s3ReadRequestCounts;

explicit S3Metric(const std::string& name = "")
: fsName(!name.empty() ? name
: prefix + curve::common::ToHexString(this)),
adaptorWrite(prefix, fsName + "_adaptor_write"),
adaptorRead(prefix, fsName + "_adaptor_read"),
adaptorDequeue(prefix, fsName + "_adaptor_dequeue"),
adaptorProcess(prefix, fsName + "_adaptor_process"),
adaptorAsyncReadS3(prefix, fsName + "_adaptor_async_read"),
asyncReadDiskCache(prefix, fsName + "_async_read_from_disk"),
asyncReadFromS3(prefix, fsName + "_async_read_from_s3"),
waitDownloading(prefix, fsName + "_wait_download"),
adaptorWriteS3(prefix, fsName + "_adaptor_write_s3"),
adaptorWriteDiskCache(prefix, fsName + "_adaptor_write_disk_cache"),
adaptorReadS3(prefix, fsName + "_adaptor_read_s3"),
Expand All @@ -286,7 +307,11 @@ struct S3Metric {
writeToKVCache(prefix, fsName + "_write_to_kv_cache"),
readFromKVCache(prefix, fsName + "_read_from_kv_cache"),
readSize(prefix, fsName + "_adaptor_read_size", 0),
writeSize(prefix, fsName + "_adaptor_write_size", 0) {}
writeSize(prefix, fsName + "_adaptor_write_size", 0) {
readAllHitsMemCounts.expose_as(prefix, "read_all_hits_mem");
readRequestCounts.expose_as(prefix, "read_request_counts");
s3ReadRequestCounts.expose_as(prefix, "s3_read_request_counts");
}
};

template <typename Tp>
Expand Down
38 changes: 34 additions & 4 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,15 @@ int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length,
ReadFromMemCache(offset, length, dataBuf, &actualReadLen,
&memCacheMissRequest);
if (memCacheMissRequest.empty()) {
if (s3ClientAdaptor_->s3Metric_) {
s3ClientAdaptor_->s3Metric_->readAllHitsMemCounts << 1;
}
return actualReadLen;
}
if (s3ClientAdaptor_->s3Metric_) {

s3ClientAdaptor_->s3Metric_->readRequestCounts << memCacheMissRequest.size();
}
VLOG(6) << "memcache miss request size: " << memCacheMissRequest.size();

// 2. read from localcache and remote cluster
Expand Down Expand Up @@ -579,22 +586,34 @@ bool FileCacheManager::ReadKVRequestFromS3(const std::string &name,
}

FileCacheManager::ReadStatus
FileCacheManager::ReadKVRequest(const std::vector<S3ReadRequest> &kvRequests,
FileCacheManager::ReadKVRequest(std::vector<S3ReadRequest> &kvRequests,
char *dataBuf, uint64_t fileLen) {
absl::BlockingCounter counter(kvRequests.size());
std::once_flag cancelFlag;
std::atomic<bool> isCanceled{false};
std::atomic<int> retCode{0};

for (const auto &req : kvRequests) {
for (auto &req : kvRequests) {
req.enqueue = butil::cpuwide_time_us();
readTaskPool_->Enqueue([&]() {
auto defer = absl::MakeCleanup([&]() { counter.DecrementCount(); });
if (isCanceled) {
LOG(WARNING) << "kv request is canceled " << req.DebugString();
return;
}
req.dequeue = butil::cpuwide_time_us() - req.enqueue;
ProcessKVRequest(req, dataBuf, fileLen, cancelFlag, isCanceled,
retCode);
req.processed = butil::cpuwide_time_us() - req.enqueue;

if (s3ClientAdaptor_->s3Metric_) {
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->adaptorDequeue, req.len, req.dequeue);
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->adaptorProcess, req.len, req.processed);
s3ClientAdaptor_->s3Metric_->s3ReadRequestCounts << 1;
}

});
}

Expand All @@ -620,7 +639,10 @@ void FileCacheManager::ProcessKVRequest(const S3ReadRequest &req, char *dataBuf,
std::string prefetchName = curvefs::common::s3util::GenObjName(
req.chunkId, blockIndex, req.compaction, req.fsId, req.inodeId,
objectPrefix);

uint64_t start = butil::cpuwide_time_us();
bool waitDownloading = false;

// if obj is in downloading, wait for it.
while (true) {
{
Expand All @@ -642,6 +664,12 @@ void FileCacheManager::ProcessKVRequest(const S3ReadRequest &req, char *dataBuf,
}
}

if (waitDownloading && s3ClientAdaptor_->s3Metric_) {
curve::client::CollectMetrics(
&s3ClientAdaptor_->s3Metric_->waitDownloading, req.len, butil::cpuwide_time_us() - start);
}


// prefetch
if (s3ClientAdaptor_->HasDiskCache() && !waitDownloading &&
!IsCachedInLocal(prefetchName)) {
Expand Down Expand Up @@ -852,15 +880,17 @@ void FileCacheManager::PrefetchS3Objs(
if (fromS3) {
auto context = std::make_shared<GetObjectAsyncContext>(
name, dataCacheS3, 0, readLen,
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, true});
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, true}, ContextType::S3);
context->start = butil::cpuwide_time_us();
auto task = [this, context]() {
s3ClientAdaptor_->GetS3Client()->DownloadAsync(context);
};
s3ClientAdaptor_->PushAsyncTask(task);
} else {
auto context = std::make_shared<GetObjectAsyncContext>(
name, dataCacheS3, 0, readLen,
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, false});
AsyncPrefetchCallback{inode_, s3ClientAdaptor_, false}, ContextType::Disk);
context->start = butil::cpuwide_time_us();
kvClientManager_->Enqueue(context);
}
}
Expand Down
6 changes: 5 additions & 1 deletion curvefs/src/client/s3/client_s3_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ using WeakDataCachePtr = std::weak_ptr<DataCache>;
using curve::common::GetObjectAsyncCallBack;
using curve::common::PutObjectAsyncCallBack;
using curve::common::S3Adapter;
using curve::common::ContextType;
using curvefs::metaserver::Inode;
using curvefs::metaserver::S3ChunkInfo;
using curvefs::metaserver::S3ChunkInfoList;
Expand Down Expand Up @@ -96,6 +97,9 @@ struct S3ReadRequest {
uint64_t fsId;
uint64_t inodeId;
uint64_t compaction;
uint64_t enqueue;
uint64_t dequeue;
uint64_t processed;

std::string DebugString() const {
std::ostringstream os;
Expand Down Expand Up @@ -426,7 +430,7 @@ class FileCacheManager {
}

// read kv request, need
ReadStatus ReadKVRequest(const std::vector<S3ReadRequest> &kvRequests,
ReadStatus ReadKVRequest(std::vector<S3ReadRequest> &kvRequests,
char *dataBuf, uint64_t fileLen);

// thread function for ReadKVRequest
Expand Down
4 changes: 3 additions & 1 deletion src/common/s3_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ struct GetObjectAsyncContext : public Aws::Client::AsyncCallerContext {
GetObjectAsyncCallBack cb;
butil::Timer timer;
ContextType type = ContextType::Unkown;
uint64_t start;

explicit GetObjectAsyncContext(
std::string key, char* buf, off_t offset, size_t len,
Expand All @@ -143,7 +144,8 @@ struct GetObjectAsyncContext : public Aws::Client::AsyncCallerContext {
len(len),
cb(std::move(cb)),
type(type),
timer(butil::Timer::STARTED) {}
timer(butil::Timer::STARTED),
start(0) {}
};

/*
Expand Down