From e498fe3dd7cdfcfc3c4ca29549379287766f7830 Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Sat, 17 Aug 2024 06:13:23 +0000 Subject: [PATCH] Refactored parallel get object request --- src/curl.cpp | 77 ------------------ src/curl.h | 2 - src/fdcache_entity.cpp | 2 +- src/s3fs_threadreqs.cpp | 175 ++++++++++++++++++++++++++++++++++++++++ src/s3fs_threadreqs.h | 18 +++++ 5 files changed, 194 insertions(+), 80 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index 1bbe0b2503..e732888209 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -1618,83 +1618,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me return 0; } -std::unique_ptr S3fsCurl::ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl) -{ - int result; - - if(!s3fscurl){ - return nullptr; - } - if(s3fscurl->retry_count >= S3fsCurl::retries){ - S3FS_PRN_ERR("Over retry count(%d) limit(%s).", s3fscurl->retry_count, s3fscurl->path.c_str()); - return nullptr; - } - - // duplicate request(setup new curl object) - std::unique_ptr newcurl(new S3fsCurl(s3fscurl->IsUseAhbe())); - - if(0 != (result = newcurl->PreGetObjectRequest(s3fscurl->path.c_str(), s3fscurl->partdata.fd, s3fscurl->partdata.startpos, s3fscurl->partdata.size, s3fscurl->b_ssetype, s3fscurl->b_ssevalue))){ - S3FS_PRN_ERR("failed downloading part setup(%d)", result); - return nullptr; - } - newcurl->retry_count = s3fscurl->retry_count + 1; - - return newcurl; -} - -int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size) -{ - S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd); - - sse_type_t ssetype = sse_type_t::SSE_DISABLE; - std::string ssevalue; - if(!get_object_sse_type(tpath, ssetype, ssevalue)){ - S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath)); - } - int result = 0; - off_t remaining_bytes; - - // cycle through open fd, pulling off 10MB chunks at a time - for(remaining_bytes = size; 0 < remaining_bytes; ){ - S3fsMultiCurl curlmulti(GetMaxParallelCount()); - int para_cnt; - off_t chunk; - - // Initialize S3fsMultiCurl - //curlmulti.SetSuccessCallback(nullptr); // not need to set success callback - curlmulti.SetRetryCallback(S3fsCurl::ParallelGetObjectRetryCallback); - - // Loop for setup parallel upload(multipart) request. - for(para_cnt = 0; para_cnt < S3fsCurl::max_parallel_cnt && 0 < remaining_bytes; para_cnt++, remaining_bytes -= chunk){ - // chunk size - chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes; - - // s3fscurl sub object - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - if(0 != (result = s3fscurl_para->PreGetObjectRequest(tpath, fd, (start + size - remaining_bytes), chunk, ssetype, ssevalue))){ - S3FS_PRN_ERR("failed downloading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - return -EIO; - } - } - - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - break; - } - - // reinit for loop. - curlmulti.Clear(); - } - return result; -} - bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl) { if(!s3fscurl){ diff --git a/src/curl.h b/src/curl.h index 06cd7dfaf5..8e830be3fb 100644 --- a/src/curl.h +++ b/src/curl.h @@ -245,7 +245,6 @@ class S3fsCurl static std::unique_ptr MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl); static std::unique_ptr CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl); static std::unique_ptr MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl); - static std::unique_ptr ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); // lazy functions for set curl options static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl); @@ -293,7 +292,6 @@ class S3fsCurl static std::unique_ptr CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result); static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd); static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages); - static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size); // class methods(variables) static std::string LookupMimeType(const std::string& name); diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index 2afbeaa699..37bb1cdd37 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -1136,7 +1136,7 @@ int FdEntity::Load(off_t start, off_t size, bool is_modified_flag) // download if(S3fsCurl::GetMultipartSize() <= need_load_size && !nomultipart){ // parallel request - result = S3fsCurl::ParallelGetObjectRequest(path.c_str(), physical_fd, iter->offset, need_load_size); + result = parallel_get_object_request(path, physical_fd, iter->offset, need_load_size); }else{ // single request if(0 < need_load_size){ diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index 90e893ef17..e434ebe3b1 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -450,6 +450,116 @@ void* multipart_put_head_req_threadworker(void* arg) return reinterpret_cast(result); } +// +// Thread Worker function for parallel get object request +// +void* parallel_get_object_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){ + return reinterpret_cast(-EIO); + } + + // Check retry max count and print debug message + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + S3FS_PRN_INFO3("Parallel Get Object Request [path=%s][fd=%d][start=%lld][size=%lld][ssetype=%u][ssevalue=%s]", pthparam->path.c_str(), pthparam->fd, static_cast(pthparam->start), static_cast(pthparam->size), static_cast(pthparam->ssetype), pthparam->ssevalue.c_str()); + + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Multipart Put Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount)); + return reinterpret_cast(-EIO); + } + } + + S3fsCurl s3fscurl(true); + int result = 0; + while(true){ + // Request + result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->ssetype, pthparam->ssevalue); + + // Check result + bool isResetOffset= true; + CURLcode curlCode = s3fscurl.GetCurlCode(); + long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET; + s3fscurl.GetResponseCode(responseCode, false); + + if(CURLE_OK == curlCode){ + if(responseCode < 400){ + // nothing to do + result = 0; + break; + + }else if(responseCode == 400){ + // as possibly in multipart + S3FS_PRN_WARN("Get Object Request(%s) got 400 response code.", pthparam->path.c_str()); + + }else if(responseCode == 404){ + // set path to not found list + S3FS_PRN_WARN("Get Object Request(%s) got 404 response code.", pthparam->path.c_str()); + break; + + }else if(responseCode == 500){ + // case of all other result, do retry.(11/13/2013) + // because it was found that s3fs got 500 error from S3, but could success + // to retry it. + S3FS_PRN_WARN("Get Object Request(%s) got 500 response code.", pthparam->path.c_str()); + + // cppcheck-suppress unmatchedSuppression + // cppcheck-suppress knownConditionTrueFalse + }else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){ + // This is a case where the processing result has not yet been updated (should be very rare). + S3FS_PRN_WARN("Get Object Request(%s) could not get any response code.", pthparam->path.c_str()); + + }else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR + // Retry in other case. + S3FS_PRN_WARN("Get Object Request(%s) got fatal response code.", pthparam->path.c_str()); + } + + }else if(CURLE_OPERATION_TIMEDOUT == curlCode){ + S3FS_PRN_ERR("Get Object Request(%s) is timeouted.", pthparam->path.c_str()); + isResetOffset= false; + + }else if(CURLE_PARTIAL_FILE == curlCode){ + S3FS_PRN_WARN("Get Object Request(%s) is recieved data does not match the given size.", pthparam->path.c_str()); + isResetOffset= false; + + }else{ + S3FS_PRN_WARN("Get Object Request(%s) got the result code(%d: %s)", pthparam->path.c_str(), curlCode, curl_easy_strerror(curlCode)); + } + + // Check retry max count + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + ++(*(pthparam->pretrycount)); + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Parallel Get Object Request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount)); + if(0 == result){ + result = -EIO; + } + break; + } + } + + // Setup for retry + if(isResetOffset){ + S3fsCurl::ResetOffset(&s3fscurl); + } + } + + // Set result code + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + if(0 == *(pthparam->presult) && 0 != result){ + // keep first error + *(pthparam->presult) = result; + } + } + + return reinterpret_cast(result); +} + //------------------------------------------------------------------- // Utility functions //------------------------------------------------------------------- @@ -648,6 +758,71 @@ int multipart_put_head_request(const std::string& strfrom, const std::string& st return 0; } +// +// Calls S3fsCurl::ParallelGetObjectRequest via parallel_get_object_req_threadworker +// +int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size) +{ + S3FS_PRN_INFO3("[path=%s][fd=%d][start=%lld][size=%lld]", path.c_str(), fd, static_cast(start), static_cast(size)); + + sse_type_t ssetype = sse_type_t::SSE_DISABLE; + std::string ssevalue; + if(!get_object_sse_type(path.c_str(), ssetype, ssevalue)){ + S3FS_PRN_WARN("Failed to get SSE type for file(%s).", path.c_str()); + } + + Semaphore para_getobj_sem(0); + std::mutex thparam_lock; + int req_count = 0; + int retrycount = 0; + int req_result = 0; + + // cycle through open fd, pulling off 10MB chunks at a time + for(off_t remaining_bytes = size, chunk = 0; 0 < remaining_bytes; remaining_bytes -= chunk){ + // chunk size + chunk = remaining_bytes > S3fsCurl::GetMultipartSize() ? S3fsCurl::GetMultipartSize() : remaining_bytes; + + // parameter for thread worker + auto* thargs = new parallel_get_object_req_thparam; // free in parallel_get_object_req_threadworker + thargs->path = path; + thargs->fd = fd; + thargs->start = (start + size - remaining_bytes); + thargs->size = chunk; + thargs->ssetype = ssetype; + thargs->ssevalue = ssevalue; + thargs->pthparam_lock = &thparam_lock; + thargs->pretrycount = &retrycount; + thargs->presult = &req_result; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = ¶_getobj_sem; + ppoolparam.pfunc = parallel_get_object_req_threadworker; + + // setup instruction + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed setup instruction for one header request."); + delete thargs; + return -EIO; + } + ++req_count; + } + + // wait for finish all requests + while(req_count > 0){ + para_getobj_sem.acquire(); + --req_count; + } + + // check result + if(0 != req_result){ + S3FS_PRN_ERR("error occurred in parallel get object request(errno=%d).", req_result); + return req_result; + } + return 0; +} + /* * Local variables: * tab-width: 4 diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index a2fea6e2c3..c4258bd5c8 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -161,6 +161,22 @@ struct multipart_put_head_req_thparam int* presult = nullptr; }; +// +// Parallel Get Object Request parameter structure for Thread Pool. +// +struct parallel_get_object_req_thparam +{ + std::string path; + int fd = -1; + off_t start = 0; + off_t size = 0; + sse_type_t ssetype = sse_type_t::SSE_DISABLE; + std::string ssevalue; + std::mutex* pthparam_lock = nullptr; + int* pretrycount = nullptr; + int* presult = nullptr; +}; + //------------------------------------------------------------------- // Thread Worker functions for MultiThread Request //------------------------------------------------------------------- @@ -175,6 +191,7 @@ void* pre_multipart_upload_req_threadworker(void* arg); void* complete_multipart_upload_threadworker(void* arg); void* abort_multipart_upload_req_threadworker(void* arg); void* multipart_put_head_req_threadworker(void* arg); +void* parallel_get_object_req_threadworker(void* arg); //------------------------------------------------------------------- // Utility functions @@ -183,6 +200,7 @@ int pre_multipart_upload_request(const std::string& path, const headers_t& meta, int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts); int abort_multipart_upload_request(const std::string& path, const std::string& upload_id); int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta); +int parallel_get_object_request(const std::string& path, int fd, off_t start, off_t size); #endif // S3FS_THREADREQS_H_