Skip to content

Commit

Permalink
Refactored parallel get object request
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Nov 3, 2024
1 parent 2d765d1 commit e498fe3
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 80 deletions.
77 changes: 0 additions & 77 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1618,83 +1618,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
return 0;
}

std::unique_ptr<S3fsCurl> S3fsCurl::ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl)
{
int result;

if(!s3fscurl){
return nullptr;
}
if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s).", s3fscurl->retry_count, s3fscurl->path.c_str());
return nullptr;
}

// duplicate request(setup new curl object)
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));

if(0 != (result = newcurl->PreGetObjectRequest(s3fscurl->path.c_str(), s3fscurl->partdata.fd, s3fscurl->partdata.startpos, s3fscurl->partdata.size, s3fscurl->b_ssetype, s3fscurl->b_ssevalue))){
S3FS_PRN_ERR("failed downloading part setup(%d)", result);
return nullptr;
}
newcurl->retry_count = s3fscurl->retry_count + 1;

return newcurl;
}

int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size)
{
S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(tpath, ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath));
}
int result = 0;
off_t remaining_bytes;

// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = size; 0 < remaining_bytes; ){
S3fsMultiCurl curlmulti(GetMaxParallelCount());
int para_cnt;
off_t chunk;

// Initialize S3fsMultiCurl
//curlmulti.SetSuccessCallback(nullptr); // not need to set success callback
curlmulti.SetRetryCallback(S3fsCurl::ParallelGetObjectRetryCallback);

// Loop for setup parallel upload(multipart) request.
for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){
// chunk size
chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;

// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
if(0 != (result = s3fscurl_para->PreGetObjectRequest(tpath, fd, (start + size - remaining_bytes), chunk, ssetype, ssevalue))){
S3FS_PRN_ERR("failed downloading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
break;
}

// reinit for loop.
curlmulti.Clear();
}
return result;
}

bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
Expand Down
2 changes: 0 additions & 2 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ class S3fsCurl
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl);

// lazy functions for set curl options
static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl);
Expand Down Expand Up @@ -293,7 +292,6 @@ class S3fsCurl
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size);

// class methods(variables)
static std::string LookupMimeType(const std::string& name);
Expand Down
2 changes: 1 addition & 1 deletion src/fdcache_entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ int FdEntity::Load(off_t start, off_t size, bool is_modified_flag)
// download
if(S3fsCurl::GetMultipartSize() <= need_load_size && !nomultipart){
// parallel request
result = S3fsCurl::ParallelGetObjectRequest(path.c_str(), physical_fd, iter->offset, need_load_size);
result = parallel_get_object_request(path, physical_fd, iter->offset, need_load_size);
}else{
// single request
if(0 < need_load_size){
Expand Down
175 changes: 175 additions & 0 deletions src/s3fs_threadreqs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,116 @@ void* multipart_put_head_req_threadworker(void* arg)
return reinterpret_cast<void*>(result);
}

//
// Thread Worker function for parallel get object request
//
void* parallel_get_object_req_threadworker(void* arg)
{
auto* pthparam = static_cast<parallel_get_object_req_thparam*>(arg);
if(!pthparam || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){
return reinterpret_cast<void*>(-EIO);
}

// Check retry max count and print debug message
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));

S3FS_PRN_INFO3("Parallel Get Object Request [path=%s][fd=%d][start=%lld][size=%lld][ssetype=%u][ssevalue=%s]", pthparam->path.c_str(), pthparam->fd, static_cast<long long int>(pthparam->start), static_cast<long long int>(pthparam->size), static_cast<uint8_t>(pthparam->ssetype), pthparam->ssevalue.c_str());

if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Multipart Put Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
return reinterpret_cast<void*>(-EIO);
}
}

S3fsCurl s3fscurl(true);
int result = 0;
while(true){
// Request
result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->ssetype, pthparam->ssevalue);

// Check result
bool isResetOffset= true;
CURLcode curlCode = s3fscurl.GetCurlCode();
long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET;
s3fscurl.GetResponseCode(responseCode, false);

if(CURLE_OK == curlCode){
if(responseCode < 400){
// nothing to do
result = 0;
break;

}else if(responseCode == 400){
// as possibly in multipart
S3FS_PRN_WARN("Get Object Request(%s) got 400 response code.", pthparam->path.c_str());

}else if(responseCode == 404){
// set path to not found list
S3FS_PRN_WARN("Get Object Request(%s) got 404 response code.", pthparam->path.c_str());
break;

}else if(responseCode == 500){
// case of all other result, do retry.(11/13/2013)
// because it was found that s3fs got 500 error from S3, but could success
// to retry it.
S3FS_PRN_WARN("Get Object Request(%s) got 500 response code.", pthparam->path.c_str());

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
}else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){
// This is a case where the processing result has not yet been updated (should be very rare).
S3FS_PRN_WARN("Get Object Request(%s) could not get any response code.", pthparam->path.c_str());

}else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR
// Retry in other case.
S3FS_PRN_WARN("Get Object Request(%s) got fatal response code.", pthparam->path.c_str());
}

}else if(CURLE_OPERATION_TIMEDOUT == curlCode){
S3FS_PRN_ERR("Get Object Request(%s) is timeouted.", pthparam->path.c_str());
isResetOffset= false;

}else if(CURLE_PARTIAL_FILE == curlCode){
S3FS_PRN_WARN("Get Object Request(%s) is recieved data does not match the given size.", pthparam->path.c_str());
isResetOffset= false;

}else{
S3FS_PRN_WARN("Get Object Request(%s) got the result code(%d: %s)", pthparam->path.c_str(), curlCode, curl_easy_strerror(curlCode));
}

// Check retry max count
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));

++(*(pthparam->pretrycount));
if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){
S3FS_PRN_ERR("Parallel Get Object Request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount));
if(0 == result){
result = -EIO;
}
break;
}
}

// Setup for retry
if(isResetOffset){
S3fsCurl::ResetOffset(&s3fscurl);
}
}

// Set result code
{
const std::lock_guard<std::mutex> lock(*(pthparam->pthparam_lock));
if(0 == *(pthparam->presult) && 0 != result){
// keep first error
*(pthparam->presult) = result;
}
}

return reinterpret_cast<void*>(result);
}

//-------------------------------------------------------------------
// Utility functions
//-------------------------------------------------------------------
Expand Down Expand Up @@ -648,6 +758,71 @@ int multipart_put_head_request(const std::string& strfrom, const std::string& st
return 0;
}

//
// Calls S3fsCurl::ParallelGetObjectRequest via parallel_get_object_req_threadworker
//
int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size)
{
S3FS_PRN_INFO3("[path=%s][fd=%d][start=%lld][size=%lld]", path.c_str(), fd, static_cast<long long int>(start), static_cast<long long int>(size));

sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
if(!get_object_sse_type(path.c_str(), ssetype, ssevalue)){
S3FS_PRN_WARN("Failed to get SSE type for file(%s).", path.c_str());
}

Semaphore para_getobj_sem(0);
std::mutex thparam_lock;
int req_count = 0;
int retrycount = 0;
int req_result = 0;

// cycle through open fd, pulling off 10MB chunks at a time
for(off_t remaining_bytes = size, chunk = 0; 0 < remaining_bytes; remaining_bytes -= chunk){
// chunk size
chunk = remaining_bytes > S3fsCurl::GetMultipartSize() ? S3fsCurl::GetMultipartSize() : remaining_bytes;

// parameter for thread worker
auto* thargs = new parallel_get_object_req_thparam; // free in parallel_get_object_req_threadworker
thargs->path = path;
thargs->fd = fd;
thargs->start = (start + size - remaining_bytes);
thargs->size = chunk;
thargs->ssetype = ssetype;
thargs->ssevalue = ssevalue;
thargs->pthparam_lock = &thparam_lock;
thargs->pretrycount = &retrycount;
thargs->presult = &req_result;

// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &para_getobj_sem;
ppoolparam.pfunc = parallel_get_object_req_threadworker;

// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
}
++req_count;
}

// wait for finish all requests
while(req_count > 0){
para_getobj_sem.acquire();
--req_count;
}

// check result
if(0 != req_result){
S3FS_PRN_ERR("error occurred in parallel get object request(errno=%d).", req_result);
return req_result;
}
return 0;
}

/*
* Local variables:
* tab-width: 4
Expand Down
18 changes: 18 additions & 0 deletions src/s3fs_threadreqs.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,22 @@ struct multipart_put_head_req_thparam
int* presult = nullptr;
};

//
// Parallel Get Object Request parameter structure for Thread Pool.
//
struct parallel_get_object_req_thparam
{
std::string path;
int fd = -1;
off_t start = 0;
off_t size = 0;
sse_type_t ssetype = sse_type_t::SSE_DISABLE;
std::string ssevalue;
std::mutex* pthparam_lock = nullptr;
int* pretrycount = nullptr;
int* presult = nullptr;
};

//-------------------------------------------------------------------
// Thread Worker functions for MultiThread Request
//-------------------------------------------------------------------
Expand All @@ -175,6 +191,7 @@ void* pre_multipart_upload_req_threadworker(void* arg);
void* complete_multipart_upload_threadworker(void* arg);
void* abort_multipart_upload_req_threadworker(void* arg);
void* multipart_put_head_req_threadworker(void* arg);
void* parallel_get_object_req_threadworker(void* arg);

//-------------------------------------------------------------------
// Utility functions
Expand All @@ -183,6 +200,7 @@ int pre_multipart_upload_request(const std::string& path, const headers_t& meta,
int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts);
int abort_multipart_upload_request(const std::string& path, const std::string& upload_id);
int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);
int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size);

#endif // S3FS_THREADREQS_H_

Expand Down

0 comments on commit e498fe3

Please sign in to comment.