From b7db8aa541751c75f22641a8a2d0d83446be465a Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Mon, 15 Jul 2024 06:40:05 +0000 Subject: [PATCH] Refactored single type requests to use through ThreadPoolMan --- src/Makefile.am | 1 + src/curl.cpp | 157 +++++------ src/curl.h | 41 ++- src/fdcache_entity.cpp | 354 +++++++++++++++++------- src/fdcache_entity.h | 8 +- src/fdcache_fdinfo.cpp | 68 +++-- src/fdcache_fdinfo.h | 19 +- src/mpu_util.cpp | 7 +- src/s3fs.cpp | 333 +++++++++++----------- src/s3fs_cred.cpp | 99 +++---- src/s3fs_cred.h | 20 +- src/s3fs_help.cpp | 2 +- src/s3fs_threadreqs.cpp | 591 ++++++++++++++++++++++++++++++++++++++++ src/s3fs_threadreqs.h | 187 +++++++++++++ src/threadpoolman.cpp | 31 +++ src/threadpoolman.h | 1 + 16 files changed, 1463 insertions(+), 456 deletions(-) create mode 100644 src/s3fs_threadreqs.cpp create mode 100644 src/s3fs_threadreqs.h diff --git a/src/Makefile.am b/src/Makefile.am index b2fdf12a17..3bb2b1bef2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -43,6 +43,7 @@ s3fs_SOURCES = \ string_util.cpp \ s3fs_cred.cpp \ s3fs_util.cpp \ + s3fs_threadreqs.cpp \ fdcache.cpp \ fdcache_entity.cpp \ fdcache_page.cpp \ diff --git a/src/curl.cpp b/src/curl.cpp index a8d674b7e1..d2c8263b97 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -42,6 +42,7 @@ #include "s3fs_util.h" #include "string_util.h" #include "addhead.h" +#include "s3fs_threadreqs.h" //------------------------------------------------------------------- // Symbols @@ -1216,28 +1217,28 @@ bool S3fsCurl::SetIPResolveType(const char* value) // cppcheck-suppress unmatchedSuppression // cppcheck-suppress constParameter // cppcheck-suppress constParameterCallback -bool S3fsCurl::UploadMultipartPostCallback(S3fsCurl* s3fscurl, void* param) +bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param) { if(!s3fscurl || param){ // this callback does not need a parameter return false; } - return s3fscurl->UploadMultipartPostComplete(); + return s3fscurl->MultipartUploadPartComplete(); } // cppcheck-suppress unmatchedSuppression // cppcheck-suppress constParameter // cppcheck-suppress constParameterCallback -bool S3fsCurl::MixMultipartPostCallback(S3fsCurl* s3fscurl, void* param) +bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param) { if(!s3fscurl || param){ // this callback does not need a parameter return false; } - return s3fscurl->MixMultipartPostComplete(); + return s3fscurl->MixMultipartUploadComplete(); } -std::unique_ptr S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl) +std::unique_ptr S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl) { if(!s3fscurl){ return nullptr; @@ -1277,14 +1278,14 @@ std::unique_ptr S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s newcurl->type = s3fscurl->type; // setup new curl object - if(0 != newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){ + if(0 != newcurl->MultipartUploadPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){ S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); return nullptr; } return newcurl; } -std::unique_ptr S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl) +std::unique_ptr S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl) { if(!s3fscurl){ return nullptr; @@ -1321,23 +1322,23 @@ std::unique_ptr S3fsCurl::CopyMultipartPostRetryCallback(S3fsCurl* s3f newcurl->type = s3fscurl->type; // setup new curl object - if(0 != newcurl->CopyMultipartPostSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){ + if(0 != newcurl->CopyMultipartUploadSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){ S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); return nullptr; } return newcurl; } -std::unique_ptr S3fsCurl::MixMultipartPostRetryCallback(S3fsCurl* s3fscurl) +std::unique_ptr S3fsCurl::MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl) { if(!s3fscurl){ return nullptr; } if(-1 == s3fscurl->partdata.fd){ - return S3fsCurl::CopyMultipartPostRetryCallback(s3fscurl); + return S3fsCurl::CopyMultipartUploadRetryCallback(s3fscurl); }else{ - return S3fsCurl::UploadMultipartPostRetryCallback(s3fscurl); + return S3fsCurl::MultipartUploadPartRetryCallback(s3fscurl); } } @@ -1395,7 +1396,7 @@ std::unique_ptr S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); - if(0 != (result = s3fscurl->UploadMultipartPostSetup(tpath, part_num, upload_id))){ + if(0 != (result = s3fscurl->MultipartUploadPartSetup(tpath, part_num, upload_id))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return nullptr; } @@ -1416,7 +1417,7 @@ std::unique_ptr S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); - if(0 != (result = s3fscurl->CopyMultipartPostSetup(tpath, tpath, part_num, upload_id, meta))){ + if(0 != (result = s3fscurl->CopyMultipartUploadSetup(tpath, tpath, part_num, upload_id, meta))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return nullptr; } @@ -1431,14 +1432,13 @@ std::unique_ptr S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in return s3fscurl; } -int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd) +int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd) { int result; std::string upload_id; struct stat st; etaglist_t list; off_t remaining_bytes; - S3fsCurl s3fscurl(true); S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd); @@ -1447,15 +1447,14 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, return -errno; } - if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id))){ + if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){ return result; } - s3fscurl.DestroyCurlHandle(); // Initialize S3fsMultiCurl S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback); - curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback); + curlmulti.SetSuccessCallback(S3fsCurl::MultipartUploadPartCallback); + curlmulti.SetRetryCallback(S3fsCurl::MultipartUploadPartRetryCallback); // cycle through open fd, pulling off 10MB chunks at a time for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){ @@ -1471,7 +1470,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, s3fscurl_para->partdata.add_etag_list(list); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ + if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -1487,18 +1486,14 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, // Multi request if(0 != (result = curlmulti.Request())){ S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(result2 != 0){ + int result2; + if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); } - return result; } - if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){ + if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){ return result; } return 0; @@ -1510,7 +1505,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me std::string upload_id; struct stat st; etaglist_t list; - S3fsCurl s3fscurl(true); S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd); @@ -1519,10 +1513,9 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me return -errno; } - if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id))){ + if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){ return result; } - s3fscurl.DestroyCurlHandle(); // for copy multipart std::string srcresource; @@ -1533,8 +1526,8 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me // Initialize S3fsMultiCurl S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartPostCallback); - curlmulti.SetRetryCallback(S3fsCurl::MixMultipartPostRetryCallback); + curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartUploadCallback); + curlmulti.SetRetryCallback(S3fsCurl::MixMultipartUploadRetryCallback); for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){ if(iter->modified){ @@ -1550,7 +1543,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset), static_cast(iter->bytes), s3fscurl_para->partdata.get_part_number()); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ + if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -1588,7 +1581,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset + i), static_cast(bytes), s3fscurl_para->partdata.get_part_number()); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ + if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -1605,17 +1598,14 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me // Multi request if(0 != (result = curlmulti.Request())){ S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(result2 != 0){ + int result2; + if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); } return result; } - if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){ + if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){ return result; } return 0; @@ -1698,7 +1688,7 @@ int S3fsCurl::ParallelGetObjectRequest(const char* tpath, int fd, off_t start, o return result; } -bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl) +bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl) { if(!s3fscurl){ return false; @@ -1740,7 +1730,7 @@ bool S3fsCurl::UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl) return true; } -bool S3fsCurl::CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl) +bool S3fsCurl::CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl) { if(!s3fscurl){ return false; @@ -3406,7 +3396,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta) return 0; } -int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy) +int S3fsCurl::PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy) { S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath)); @@ -3694,22 +3684,17 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t return 0; } -int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, off_t size) +int S3fsCurl::GetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue) { int result; - S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld]", SAFESTRPTR(tpath), static_cast(start), static_cast(size)); + S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][ssetype=%u][ssevalue=%s]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), static_cast(ssetype), ssevalue.c_str()); if(!tpath){ return -EINVAL; } - sse_type_t local_ssetype = sse_type_t::SSE_DISABLE; - std::string ssevalue; - if(!get_object_sse_type(tpath, local_ssetype, ssevalue)){ - S3FS_PRN_WARN("Failed to get SSE type for file(%s).", SAFESTRPTR(tpath)); - } - if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, local_ssetype, ssevalue))){ + if(0 != (result = PreGetObjectRequest(tpath, fd, start, size, ssetype, ssevalue))){ return result; } if(!fpLazySetup || !fpLazySetup(this)){ @@ -3868,7 +3853,7 @@ int S3fsCurl::ListBucketRequest(const char* tpath, const char* query) // Date: Mon, 1 Nov 2010 20:34:56 GMT // Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZCBieSBlbHZpbmc= // -int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id) +int S3fsCurl::PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id) { S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath)); @@ -3971,7 +3956,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::s return 0; } -int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts) +int S3fsCurl::MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts) { S3FS_PRN_INFO3("[tpath=%s][parts=%zu]", SAFESTRPTR(tpath), parts.size()); @@ -4179,7 +4164,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, const std::string& upload_ // Content-MD5: pUNXr/BjKK5G2UKvaRRrOA== // Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc= // -int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id) +int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id) { S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(partdata.startpos), static_cast(partdata.size), part_num); @@ -4234,30 +4219,30 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, const st type = REQTYPE::UPLOADMULTIPOST; // set lazy function - fpLazySetup = UploadMultipartPostSetCurlOpts; + fpLazySetup = MultipartUploadPartSetCurlOpts; return 0; } -int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id) +int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id) { int result; S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(partdata.startpos), static_cast(partdata.size), part_num); // setup - if(0 != (result = S3fsCurl::UploadMultipartPostSetup(tpath, part_num, upload_id))){ + if(0 != (result = S3fsCurl::MultipartUploadPartSetup(tpath, part_num, upload_id))){ return result; } if(!fpLazySetup || !fpLazySetup(this)){ - S3FS_PRN_ERR("Failed to lazy setup in multipart upload post request."); + S3FS_PRN_ERR("Failed to lazy setup in multipart upload part request."); return -EIO; } // request if(0 == (result = RequestPerform())){ - if(!UploadMultipartPostComplete()){ + if(!MultipartUploadPartComplete()){ result = -EIO; } } @@ -4269,7 +4254,7 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const return result; } -int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta) +int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta) { S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num); @@ -4327,7 +4312,7 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_ type = REQTYPE::COPYMULTIPOST; // set lazy function - fpLazySetup = CopyMultipartPostSetCurlOpts; + fpLazySetup = CopyMultipartUploadSetCurlOpts; // request S3FS_PRN_INFO3("copying... [from=%s][to=%s][part=%d]", from, to, part_num); @@ -4335,7 +4320,7 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_ return 0; } -bool S3fsCurl::UploadMultipartPostComplete() +bool S3fsCurl::MultipartUploadPartComplete() { auto it = responseHeaders.find("ETag"); if (it == responseHeaders.cend()) { @@ -4364,7 +4349,7 @@ bool S3fsCurl::UploadMultipartPostComplete() // cppcheck-suppress unmatchedSuppression // cppcheck-suppress constParameter // cppcheck-suppress constParameterCallback -bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param) +bool S3fsCurl::CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param) { if(!s3fscurl || param){ // this callback does not need a parameter return false; @@ -4372,10 +4357,10 @@ bool S3fsCurl::CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param) // cppcheck-suppress unmatchedSuppression // cppcheck-suppress knownConditionTrueFalse - return s3fscurl->CopyMultipartPostComplete(); + return s3fscurl->CopyMultipartUploadComplete(); } -bool S3fsCurl::CopyMultipartPostComplete() +bool S3fsCurl::CopyMultipartUploadComplete() { std::string etag; partdata.uploaded = simple_parse_xml(bodydata.c_str(), bodydata.size(), "ETag", etag); @@ -4387,13 +4372,13 @@ bool S3fsCurl::CopyMultipartPostComplete() return true; } -bool S3fsCurl::MixMultipartPostComplete() +bool S3fsCurl::MixMultipartUploadComplete() { bool result; if(-1 == partdata.fd){ - result = CopyMultipartPostComplete(); + result = CopyMultipartUploadComplete(); }else{ - result = UploadMultipartPostComplete(); + result = MultipartUploadPartComplete(); } return result; } @@ -4408,15 +4393,15 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath)); - if(0 != (result = PreMultipartPostRequest(tpath, meta, upload_id))){ + if(0 != (result = PreMultipartUploadRequest(tpath, meta, upload_id))){ return result; } DestroyCurlHandle(); // Initialize S3fsMultiCurl S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback); - curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback); + curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback); + curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback); for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){ chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining; @@ -4432,7 +4417,7 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met s3fscurl_para->partdata.add_etag_list(list); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ + if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -4447,17 +4432,14 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met // Multi request if(0 != (result = curlmulti.Request())){ S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(result2 != 0){ + int result2; + if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); } return result; } - if(0 != (result = CompleteMultipartPostRequest(tpath, upload_id, list))){ + if(0 != (result = MultipartUploadComplete(tpath, upload_id, list))){ return result; } return 0; @@ -4477,7 +4459,7 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t // upload part int result; - if(0 != (result = UploadMultipartPostRequest(tpath, petagpair->part_num, upload_id))){ + if(0 != (result = MultipartUploadPartRequest(tpath, petagpair->part_num, upload_id))){ S3FS_PRN_ERR("failed uploading %d part by error(%d)", petagpair->part_num, result); return result; } @@ -4503,15 +4485,15 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t meta["Content-Type"] = S3fsCurl::LookupMimeType(to); meta["x-amz-copy-source"] = srcresource; - if(0 != (result = PreMultipartPostRequest(to, meta, upload_id))){ + if(0 != (result = PreMultipartUploadRequest(to, meta, upload_id))){ return result; } DestroyCurlHandle(); // Initialize S3fsMultiCurl S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback); - curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback); + curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback); + curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback); for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){ chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining; @@ -4527,7 +4509,7 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t s3fscurl_para->partdata.add_etag_list(list); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ + if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -4542,17 +4524,14 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t // Multi request if(0 != (result = curlmulti.Request())){ S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl_abort.AbortMultipartUpload(to, upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(result2 != 0){ + int result2; + if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){ S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); } return result; } - if(0 != (result = CompleteMultipartPostRequest(to, upload_id, list))){ + if(0 != (result = MultipartUploadComplete(to, upload_id, list))){ return result; } return 0; diff --git a/src/curl.h b/src/curl.h index 8f6857a43d..089e42f33c 100644 --- a/src/curl.h +++ b/src/curl.h @@ -233,7 +233,6 @@ class S3fsCurl static int CurlProgress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow); static std::string extractURI(const std::string& url); - static bool LocateBundle(); static size_t HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr); static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data); @@ -241,16 +240,17 @@ class S3fsCurl static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp); - static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl, void* param); - static bool CopyMultipartPostCallback(S3fsCurl* s3fscurl, void* param); - static bool MixMultipartPostCallback(S3fsCurl* s3fscurl, void* param); - static std::unique_ptr UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl); - static std::unique_ptr CopyMultipartPostRetryCallback(S3fsCurl* s3fscurl); - static std::unique_ptr MixMultipartPostRetryCallback(S3fsCurl* s3fscurl); + static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param); + static bool CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param); + static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param); + static std::unique_ptr MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl); + static std::unique_ptr CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl); + static std::unique_ptr MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl); static std::unique_ptr ParallelGetObjectRetryCallback(S3fsCurl* s3fscurl); // lazy functions for set curl options - static bool CopyMultipartPostSetCurlOpts(S3fsCurl* s3fscurl); + static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl); + static bool CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl); static bool PreGetObjectRequestSetCurlOpts(S3fsCurl* s3fscurl); static bool PreHeadRequestSetCurlOpts(S3fsCurl* s3fscurl); @@ -275,10 +275,10 @@ class S3fsCurl bool AddSseRequestHead(sse_type_t ssetype, const std::string& ssevalue, bool is_copy); std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token); std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token); - int UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id); - int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta); - bool UploadMultipartPostComplete(); - bool CopyMultipartPostComplete(); + int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id); + int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta); + bool MultipartUploadPartComplete(); + bool CopyMultipartUploadComplete(); int MapPutErrorResponse(int result); public: @@ -288,13 +288,10 @@ class S3fsCurl static bool InitMimeType(const std::string& strFile); static bool DestroyS3fsCurl(); static std::unique_ptr CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result); - static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd); + static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd); static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages); static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size); - // lazy functions for set curl options(public) - static bool UploadMultipartPostSetCurlOpts(S3fsCurl* s3fscurl); - // class methods(variables) static std::string LookupMimeType(const std::string& name); static bool SetCheckCertificate(bool isCertCheck); @@ -375,16 +372,16 @@ class S3fsCurl return PreHeadRequest(tpath.c_str(), bpath.c_str(), savedpath.c_str(), ssekey_pos); } int HeadRequest(const char* tpath, headers_t& meta); - int PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy); + int PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy); int PutRequest(const char* tpath, headers_t& meta, int fd); int PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue); - int GetObjectRequest(const char* tpath, int fd, off_t start = -1, off_t size = -1); + int GetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue); int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse); int ListBucketRequest(const char* tpath, const char* query); - int PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id); - int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts); - int UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id); - bool MixMultipartPostComplete(); + int PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id); + int MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts); + int MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id); + bool MixMultipartUploadComplete(); int MultipartListRequest(std::string& body); int AbortMultipartUpload(const char* tpath, const std::string& upload_id); int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta); diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index e20d332ee9..7d66b45efb 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -40,13 +40,35 @@ #include "s3fs_logger.h" #include "s3fs_util.h" #include "curl.h" +#include "curl_util.h" #include "s3fs_cred.h" +#include "threadpoolman.h" +#include "s3fs_threadreqs.h" //------------------------------------------------ // Symbols //------------------------------------------------ static constexpr int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max count +//------------------------------------------------ +// Structure of parameters to pass to thread +//------------------------------------------------ +// +// Multipart Upload Request parameter structure for Thread Pool. +// +// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.) +// +struct multipart_upload_req_thparam +{ + std::string path; + std::string upload_id; + int fd = -1; + off_t start = 0; + off_t size = 0; + etagpair* petagpair = nullptr; + int result = 0; +}; + //------------------------------------------------ // FdEntity class variables //------------------------------------------------ @@ -106,6 +128,25 @@ ino_t FdEntity::GetInode(int fd) return st.st_ino; } +// +// Worker function for multipart upload request +// +// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.) +// +void* FdEntity::MultipartUploadThreadWorker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload id=%s][fd=%d][start=%lld][size=%lld][etagpair=%p]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->fd, static_cast(pthparam->start), static_cast(pthparam->size), pthparam->petagpair); + + S3fsCurl s3fscurl(true); + pthparam->result = s3fscurl.MultipartUploadRequest(pthparam->upload_id, pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->petagpair); + + return reinterpret_cast(pthparam->result); +} + //------------------------------------------------ // FdEntity methods //------------------------------------------------ @@ -1039,8 +1080,7 @@ int FdEntity::Load(off_t start, off_t size, bool is_modified_flag) }else{ // single request if(0 < need_load_size){ - S3fsCurl s3fscurl; - result = s3fscurl.GetObjectRequest(path.c_str(), physical_fd, iter->offset, need_load_size); + result = get_object_request(path, physical_fd, iter->offset, need_load_size); }else{ result = 0; } @@ -1164,8 +1204,7 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si // single area get request if(0 < need_load_size){ - S3fsCurl s3fscurl; - if(0 != (result = s3fscurl.GetObjectRequest(path.c_str(), tmpfd, offset, oneread))){ + if(0 != (result = get_object_request(path, tmpfd, offset, oneread))){ S3FS_PRN_ERR("failed to get object(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(offset), static_cast(oneread), tmpfd); break; } @@ -1180,9 +1219,9 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si }else{ // already loaded area } - // single area upload by multipart post - if(0 != (result = NoCacheMultipartPost(pseudo_obj, upload_fd, offset, oneread))){ - S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(offset), static_cast(oneread), upload_fd); + // single area upload by multipart upload + if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, upload_fd, offset, oneread))){ + S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(offset), static_cast(oneread), upload_fd); break; } } @@ -1222,35 +1261,57 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si return result; } +// +// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request +// // [NOTE] -// At no disk space for caching object. -// This method is starting multipart uploading. +// If the request is successful, initialize upload_id. // -int FdEntity::NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj) +int FdEntity::PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) { if(!pseudo_obj){ S3FS_PRN_ERR("Internal error, pseudo fd object pointer is null."); return -EIO; } - // initialize multipart upload values - pseudo_obj->ClearUploadInfo(true); - - S3fsCurl s3fscurl(true); + // get upload_id std::string upload_id; int result; - if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id))){ + if(0 != (result = pre_multipart_upload_request(path, orgmeta, upload_id))){ return result; } - s3fscurl.DestroyCurlHandle(); + + // reset upload_id + if(!pseudo_obj->InitialUploadInfo(upload_id)){ + S3FS_PRN_ERR("failed to initialize upload id(%s)", upload_id.c_str()); + return -EIO; + } // Clear the dirty flag, because the meta data is updated. pending_status = pending_status_t::NO_UPDATE_PENDING; - // reset upload_id - if(!pseudo_obj->InitialUploadInfo(upload_id)){ + return 0; +} + +// [NOTE] +// At no disk space for caching object. +// This method is starting multipart uploading. +// +int FdEntity::NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) +{ + if(!pseudo_obj){ + S3FS_PRN_ERR("Internal error, pseudo fd object pointer is null."); return -EIO; } + + // initialize multipart upload values + pseudo_obj->ClearUploadInfo(true); + + int result; + if(0 != (result = PreMultipartUploadRequest(pseudo_obj))){ + return result; + } + return 0; } @@ -1258,55 +1319,74 @@ int FdEntity::NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj) // At no disk space for caching object. // This method is uploading one part of multipart. // -int FdEntity::NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) +// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) +// +int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) { if(-1 == tgfd || !pseudo_obj || !pseudo_obj->IsUploading()){ - S3FS_PRN_ERR("Need to initialize for multipart post."); + S3FS_PRN_ERR("Need to initialize for multipart upload."); return -EIO; } + // parameter for thread worker + multipart_upload_req_thparam thargs; + thargs.path = path; + thargs.upload_id.clear(); + thargs.fd = tgfd; + thargs.start = start; + thargs.size = size; + thargs.petagpair = nullptr; + thargs.result = 0; + // get upload id - std::string upload_id; - if(!pseudo_obj->GetUploadId(upload_id)){ + if(!pseudo_obj->GetUploadId(thargs.upload_id)){ return -EIO; } // append new part and get it's etag string pointer - etagpair* petagpair = nullptr; - if(!pseudo_obj->AppendUploadPart(start, size, false, &petagpair)){ + if(!pseudo_obj->AppendUploadPart(start, size, false, &(thargs.petagpair))){ return -EIO; } - S3fsCurl s3fscurl(true); - return s3fscurl.MultipartUploadRequest(upload_id, path.c_str(), tgfd, start, size, petagpair); + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = FdEntity::MultipartUploadThreadWorker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Get Object Request Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Multipart Upload Request(path=%s, upload_id=%s, fd=%d, start=%lld, size=%lld) returns with error(%d)", path.c_str(), thargs.upload_id.c_str(), tgfd, static_cast(start), static_cast(size), thargs.result); + return thargs.result; + } + return 0; } // [NOTE] // At no disk space for caching object. // This method is finishing multipart uploading. // -int FdEntity::NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj) +int FdEntity::NoCacheMultipartUploadComplete(PseudoFdInfo* pseudo_obj) { - etaglist_t etaglist; - if(!pseudo_obj || !pseudo_obj->IsUploading() || !pseudo_obj->GetEtaglist(etaglist)){ - S3FS_PRN_ERR("There is no upload id or etag list."); - return -EIO; - } - - // get upload id + // get upload id and etag list std::string upload_id; - if(!pseudo_obj->GetUploadId(upload_id)){ + etaglist_t parts; + if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(parts)){ return -EIO; } - S3fsCurl s3fscurl(true); - int result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist); - s3fscurl.DestroyCurlHandle(); - if(0 != result){ - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl.AbortMultipartUpload(path.c_str(), upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(0 != result2){ + int result; + if(0 != (result = complete_multipart_upload_request(path, upload_id, parts))){ + S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result); + untreated_list.ClearAll(); + pseudo_obj->ClearUploadInfo(); // clear multipart upload info + + int result2; + if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){ S3FS_PRN_ERR("failed to abort multipart upload by errno(%d)", result2); } return result; @@ -1398,6 +1478,9 @@ int FdEntity::RowFlushHasLock(int fd, const char* tpath, bool force_sync) return result; } +// +// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) +// int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath) { S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd); @@ -1411,23 +1494,20 @@ int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tp return -EBADF; } - int result; - std::string tmppath = path; - headers_t tmporgmeta = orgmeta; - // If there is no loading all of the area, loading all area. off_t restsize = pagelist.GetTotalUnloadedPageSize(); if(0 < restsize){ // check disk space if(!ReserveDiskSpace(restsize)){ // no enough disk space - S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", path.c_str(), pseudo_obj->GetPseudoFd(), physical_fd); + S3FS_PRN_WARN("Not enough local storage to flush: [path=%s][pseudo_fd=%d][physical_fd=%d]", (tpath ? tpath : path.c_str()), pseudo_obj->GetPseudoFd(), physical_fd); return -ENOSPC; // No space left on device } } FdManager::FreeReservedDiskSpace(restsize); // Always load all uninitialized area + int result; if(0 != (result = Load(/*start=*/ 0, /*size=*/ 0))){ S3FS_PRN_ERR("failed to upload all area(errno=%d)", result); return result; @@ -1445,21 +1525,45 @@ int FdEntity::RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tp S3FS_PRN_ERR("fstat is failed by errno(%d), but continue...", errno); } - S3fsCurl s3fscurl(true); - result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd); + // parameter for thread worker + put_req_thparam thargs; + thargs.path = tpath ? tpath : path; + thargs.meta = orgmeta; // copy + thargs.fd = physical_fd; + thargs.ahbe = true; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Put Request for Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + // continue... + S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", thargs.path.c_str(), thargs.result); + } // reset uploaded file size size_orgmeta = st.st_size; untreated_list.ClearAll(); - if(0 == result){ + if(0 == thargs.result){ pagelist.ClearAllModified(); } - return result; + return thargs.result; } +// +// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) +// int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) { S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd); @@ -1479,7 +1583,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // Check rest size and free disk space if(0 < restsize && !ReserveDiskSpace(restsize)){ // no enough disk space - if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){ + if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){ S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result); return result; } @@ -1518,8 +1622,31 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) }else{ // normal uploading (too small part size) - S3fsCurl s3fscurl(true); - result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd); + + // parameter for thread worker + put_req_thparam thargs; + thargs.path = tpath ? tpath : tmppath; + thargs.meta = tmporgmeta; // copy + thargs.fd = physical_fd; + thargs.ahbe = true; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Put Request for Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + // continue... + S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", (tpath ? tpath : tmppath.c_str()), thargs.result); + } + result = thargs.result; } // reset uploaded file size @@ -1534,15 +1661,15 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) off_t untreated_start = 0; off_t untreated_size = 0; if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){ - if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ - S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); + if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){ + S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } untreated_list.ClearParts(untreated_start, untreated_size); } // complete multipart uploading. - if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ - S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd); + if(0 != (result = NoCacheMultipartUploadComplete(pseudo_obj))){ + S3FS_PRN_ERR("failed to complete(finish) multipart upload for file(physical_fd=%d).", physical_fd); return result; } // truncate file to zero @@ -1563,6 +1690,9 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) return result; } +// +// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) +// int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) { S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd); @@ -1582,7 +1712,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) // Check rest size and free disk space if(0 < restsize && !ReserveDiskSpace(restsize)){ // no enough disk space - if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){ + if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){ S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result); return result; } @@ -1642,8 +1772,30 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) return result; } - S3fsCurl s3fscurl(true); - result = s3fscurl.PutRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd); + // parameter for thread worker + put_req_thparam thargs; + thargs.path = tpath ? tpath : tmppath; + thargs.meta = tmporgmeta; // copy + thargs.fd = physical_fd; + thargs.ahbe = true; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Put Request for Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + // continue... + S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", (tpath ? tpath : tmppath.c_str()), thargs.result); + } + result = thargs.result; } // reset uploaded file size @@ -1658,15 +1810,15 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) off_t untreated_start = 0; off_t untreated_size = 0; if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize(), 0) && 0 < untreated_size){ - if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ - S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); + if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){ + S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } untreated_list.ClearParts(untreated_start, untreated_size); } // complete multipart uploading. - if(0 != (result = NoCacheCompleteMultipartPost(pseudo_obj))){ - S3FS_PRN_ERR("failed to complete(finish) multipart post for file(physical_fd=%d).", physical_fd); + if(0 != (result = NoCacheMultipartUploadComplete(pseudo_obj))){ + S3FS_PRN_ERR("failed to complete(finish) multipart upload for file(physical_fd=%d).", physical_fd); return result; } // truncate file to zero @@ -1687,6 +1839,9 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) return result; } +// +// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) +// int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) { S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d][mix_upload=%s]", SAFESTRPTR(tpath), path.c_str(), (pseudo_obj ? pseudo_obj->GetPseudoFd() : -1), physical_fd, (FdEntity::mixmultipart ? "true" : "false")); @@ -1713,9 +1868,30 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat return result; } - headers_t tmporgmeta = orgmeta; - S3fsCurl s3fscurl(true); - result = s3fscurl.PutRequest(path.c_str(), tmporgmeta, physical_fd); + // parameter for thread worker + put_req_thparam thargs; + thargs.path = path; + thargs.meta = orgmeta; // copy + thargs.fd = physical_fd; + thargs.ahbe = true; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Put Request for Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + // continue... + S3FS_PRN_DBG("Put Request(%s) returns with errno(%d)", path.c_str(), thargs.result); + } + result = thargs.result; // reset uploaded file size size_orgmeta = st.st_size; @@ -1785,19 +1961,9 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat // // Multipart uploading hasn't started yet, so start it. // - S3fsCurl s3fscurl(true); - std::string upload_id; - if(0 != (result = s3fscurl.PreMultipartPostRequest(path.c_str(), orgmeta, upload_id))){ - S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result); + if(0 != (result = PreMultipartUploadRequest(pseudo_obj))){ return result; } - if(!pseudo_obj->InitialUploadInfo(upload_id)){ - S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); - return -EIO; - } - - // Clear the dirty flag, because the meta data is updated. - pending_status = pending_status_t::NO_UPDATE_PENDING; } // @@ -1851,30 +2017,26 @@ int FdEntity::RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpat // Complete uploading // std::string upload_id; - etaglist_t etaglist; - if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(etaglist)){ + etaglist_t parts; + if(!pseudo_obj->GetUploadId(upload_id) || !pseudo_obj->GetEtaglist(parts)){ S3FS_PRN_ERR("There is no upload id or etag list."); untreated_list.ClearAll(); pseudo_obj->ClearUploadInfo(); // clear multipart upload info return -EIO; }else{ - S3fsCurl s3fscurl(true); - result = s3fscurl.CompleteMultipartPostRequest(path.c_str(), upload_id, etaglist); - s3fscurl.DestroyCurlHandle(); - if(0 != result){ + if(0 != (result = complete_multipart_upload_request(path, upload_id, parts))){ S3FS_PRN_ERR("failed to complete multipart upload by errno(%d)", result); untreated_list.ClearAll(); pseudo_obj->ClearUploadInfo(); // clear multipart upload info - S3fsCurl s3fscurl_abort(true); - int result2 = s3fscurl.AbortMultipartUpload(path.c_str(), upload_id); - s3fscurl_abort.DestroyCurlHandle(); - if(0 != result2){ + int result2; + if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){ S3FS_PRN_ERR("failed to abort multipart upload by errno(%d)", result2); } return result; } } + untreated_list.ClearAll(); pseudo_obj->ClearUploadInfo(); // clear multipart upload info @@ -2126,7 +2288,7 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast(start), size); return -ENOSPC; // No space left on device } - if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){ + if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){ S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result); return result; } @@ -2168,8 +2330,8 @@ ssize_t FdEntity::WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, of off_t untreated_size = 0; if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ // when multipart max size is reached - if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ - S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); + if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){ + S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } @@ -2210,7 +2372,7 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, S3FS_PRN_WARN("Not enough local storage to cache write request till multipart upload can start: [path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast(start), size); return -ENOSPC; // No space left on device } - if(0 != (result = NoCachePreMultipartPost(pseudo_obj))){ + if(0 != (result = NoCachePreMultipartUploadRequest(pseudo_obj))){ S3FS_PRN_ERR("failed to switch multipart uploading with no cache(errno=%d)", result); return result; } @@ -2243,8 +2405,8 @@ ssize_t FdEntity::WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t untreated_size = 0; if(untreated_list.GetLastUpdatedPart(untreated_start, untreated_size, S3fsCurl::GetMultipartSize())){ // when multipart max size is reached - if(0 != (result = NoCacheMultipartPost(pseudo_obj, physical_fd, untreated_start, untreated_size))){ - S3FS_PRN_ERR("failed to multipart post(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); + if(0 != (result = NoCacheMultipartUploadRequest(pseudo_obj, physical_fd, untreated_start, untreated_size))){ + S3FS_PRN_ERR("failed to multipart upload(start=%lld, size=%lld) for file(physical_fd=%d).", static_cast(untreated_start), static_cast(untreated_size), physical_fd); return result; } diff --git a/src/fdcache_entity.h b/src/fdcache_entity.h index 009d4659a3..f761e5ba2b 100644 --- a/src/fdcache_entity.h +++ b/src/fdcache_entity.h @@ -80,6 +80,7 @@ class FdEntity : public std::enable_shared_from_this private: static int FillFile(int fd, unsigned char byte, off_t size, off_t start); static ino_t GetInode(int fd); + static void* MultipartUploadThreadWorker(void* arg); // ([TODO] This is a temporary method is moved when S3fsMultiCurl is deprecated.) void Clear(); ino_t GetInode() const REQUIRES(FdEntity::fdent_data_lock); @@ -89,9 +90,10 @@ class FdEntity : public std::enable_shared_from_this bool IsUploading() REQUIRES(FdEntity::fdent_lock); bool SetAllStatus(bool is_loaded) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); bool SetAllStatusUnloaded() REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock) { return SetAllStatus(false); } - int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); - int NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) REQUIRES(FdEntity::fdent_lock); - int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock); + int PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, fdent_data_lock); + int NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); + int NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) REQUIRES(FdEntity::fdent_lock); + int NoCacheMultipartUploadComplete(PseudoFdInfo* pseudo_obj) REQUIRES(FdEntity::fdent_lock); int RowFlushHasLock(int fd, const char* tpath, bool force_sync) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); int RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock); diff --git a/src/fdcache_fdinfo.cpp b/src/fdcache_fdinfo.cpp index 596a1f0ba4..32d9c9dd6c 100644 --- a/src/fdcache_fdinfo.cpp +++ b/src/fdcache_fdinfo.cpp @@ -37,16 +37,37 @@ #include "curl.h" #include "string_util.h" #include "threadpoolman.h" +#include "s3fs_threadreqs.h" + +//------------------------------------------------ +// Structure of parameters to pass to thread +//------------------------------------------------ +// [NOTE] +// The processing related to this is currently temporarily implemented +// in this file, but will be moved to a separate file at a later. +// +struct pseudofdinfo_mpupload_thparam +{ + PseudoFdInfo* ppseudofdinfo = nullptr; + std::string path; + std::string upload_id; + int upload_fd = -1; + off_t start = 0; + off_t size = 0; + bool is_copy = false; + int part_num = -1; + etagpair* petag = nullptr; +}; //------------------------------------------------ // PseudoFdInfo class methods //------------------------------------------------ // -// Worker function for uploading +// Thread Worker function for uploading // void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg) { - std::unique_ptr pthparam(static_cast(arg)); + std::unique_ptr pthparam(static_cast(arg)); if(!pthparam || !(pthparam->ppseudofdinfo)){ return reinterpret_cast(-EIO); } @@ -82,7 +103,7 @@ void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg) // Send request and get result if(0 == (result = s3fscurl->RequestPerform())){ S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - if(!s3fscurl->MixMultipartPostComplete()){ + if(!s3fscurl->MixMultipartUploadComplete()){ S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); result = -EIO; } @@ -448,7 +469,7 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_ } // make parameter for my thread - auto* thargs = new pseudofdinfo_thparam; + auto* thargs = new pseudofdinfo_mpupload_thparam; thargs->ppseudofdinfo = this; thargs->path = SAFESTRPTR(path); thargs->upload_id = tmp_upload_id; @@ -497,6 +518,30 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li return true; } +// +// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request +// +// [NOTE] +// If the request is successful, initialize upload_id. +// +bool PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta) +{ + // get upload_id + std::string new_upload_id; + if(0 != pre_multipart_upload_request(strpath, meta, new_upload_id)){ + return false; + } + + // reset upload_id + if(!RowInitialUploadInfo(new_upload_id, false/* not need to cancel */)){ + S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); + return false; + } + S3FS_PRN_DBG("succeed to setup multipart upload(set upload id to object)"); + + return true; +} + // // Upload the last updated Untreated area // @@ -579,18 +624,9 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_ // Has multipart uploading already started? // if(!IsUploading()){ - // Multipart uploading hasn't started yet, so start it. - // - S3fsCurl s3fscurl(true); - std::string tmp_upload_id; - int result; - if(0 != (result = s3fscurl.PreMultipartPostRequest(path, meta, tmp_upload_id))){ - S3FS_PRN_ERR("failed to setup multipart upload(create upload id) by errno(%d)", result); - return result; - } - if(!RowInitialUploadInfo(tmp_upload_id, false/* not need to cancel */)){ - S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); - return result; + std::string strpath = SAFESTRPTR(path); + if(!PreMultipartUploadRequest(strpath, meta)){ + return -EIO; } } diff --git a/src/fdcache_fdinfo.h b/src/fdcache_fdinfo.h index 505d77fc81..fe04d1d0fa 100644 --- a/src/fdcache_fdinfo.h +++ b/src/fdcache_fdinfo.h @@ -33,24 +33,6 @@ class UntreatedParts; -//------------------------------------------------ -// Structure of parameters to pass to thread -//------------------------------------------------ -class PseudoFdInfo; - -struct pseudofdinfo_thparam -{ - PseudoFdInfo* ppseudofdinfo = nullptr; - std::string path; - std::string upload_id; - int upload_fd = -1; - off_t start = 0; - off_t size = 0; - bool is_copy = false; - int part_num = -1; - etagpair* petag = nullptr; -}; - //------------------------------------------------ // Class PseudoFdInfo //------------------------------------------------ @@ -83,6 +65,7 @@ class PseudoFdInfo bool GetUploadInfo(std::string& id, int& fd) const; bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy); bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag); + bool PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta); bool CancelAllThreads(); bool ExtractUploadPartsFromUntreatedArea(off_t untreated_start, off_t untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size); bool IsUploadingHasLock() const REQUIRES(upload_list_lock); diff --git a/src/mpu_util.cpp b/src/mpu_util.cpp index dd39725458..838e5862fc 100644 --- a/src/mpu_util.cpp +++ b/src/mpu_util.cpp @@ -28,6 +28,7 @@ #include "s3fs_xml.h" #include "s3fs_auth.h" #include "string_util.h" +#include "s3fs_threadreqs.h" //------------------------------------------------------------------- // Global variables @@ -68,7 +69,6 @@ static bool abort_incomp_mpu_list(const incomp_mpu_list_t& list, time_t abort_ti time_t now_time = time(nullptr); // do removing. - S3fsCurl s3fscurl; bool result = true; for(auto iter = list.cbegin(); iter != list.cend(); ++iter){ const char* tpath = (*iter).key.c_str(); @@ -85,15 +85,12 @@ static bool abort_incomp_mpu_list(const incomp_mpu_list_t& list, time_t abort_ti } } - if(0 != s3fscurl.AbortMultipartUpload(tpath, upload_id)){ + if(0 != abort_multipart_upload_request(std::string(tpath), upload_id)){ S3FS_PRN_EXIT("Failed to remove %s multipart uploading object.", tpath); result = false; }else{ printf("Succeed to remove %s multipart uploading object.\n", tpath); } - - // reset(initialize) curl object - s3fscurl.DestroyCurlHandle(); } return result; } diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 17326c294d..0234fa3bf6 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -54,6 +54,7 @@ #include "s3fs_cred.h" #include "s3fs_help.h" #include "s3fs_util.h" +#include "s3fs_threadreqs.h" #include "mpu_util.h" #include "threadpoolman.h" @@ -303,7 +304,7 @@ static bool is_special_name_folder_object(const char* path) } std::string strpath = path; - headers_t header; + headers_t header; if(std::string::npos == strpath.find("_$folder$", 0)){ if('/' == *strpath.rbegin()){ @@ -311,12 +312,15 @@ static bool is_special_name_folder_object(const char* path) } strpath += "_$folder$"; } - S3fsCurl s3fscurl; - if(0 != s3fscurl.HeadRequest(strpath.c_str(), header)){ + + // send request + if(0 != head_request(strpath, header)){ return false; } + header.clear(); S3FS_MALLOCTRIM(0); + return true; } @@ -414,9 +418,8 @@ static int chk_dir_object_type(const char* path, std::string& newpath, std::stri static int remove_old_type_dir(const std::string& path, dirtype type) { if(IS_RMTYPEDIR(type)){ - S3fsCurl s3fscurl; - int result = s3fscurl.DeleteRequest(path.c_str()); - if(0 != result && -ENOENT != result){ + int result; + if(0 != (result = delete_request(path))){ return result; } // succeed removing or not found the directory @@ -453,7 +456,6 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t headers_t tmpHead; headers_t* pheader = pmeta ? pmeta : &tmpHead; std::string strpath; - S3fsCurl s3fscurl; bool forcedir = false; bool is_mountpoint = false; // path is the mount point bool is_bucket_mountpoint = false; // path is the mount point which is the bucket root @@ -519,8 +521,9 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t }else{ strpath = path; } - result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader)); - s3fscurl.DestroyCurlHandle(); + + // get headers + result = head_request(strpath, *pheader); // if not found target path object, do over checking if(-EPERM == result){ @@ -548,22 +551,26 @@ static int get_object_attribute(const char* path, struct stat* pstbuf, headers_t if('/' != *strpath.rbegin() && std::string::npos == strpath.find("_$folder$", 0)){ // now path is "object", do check "object/" for over checking strpath += "/"; - result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader)); - s3fscurl.DestroyCurlHandle(); + + // re-get headers + result = head_request(strpath, *pheader); } if(support_compat_dir && 0 != result){ // now path is "object/", do check "object_$folder$" for over checking strpath.erase(strpath.length() - 1); strpath += "_$folder$"; - result = s3fscurl.HeadRequest(strpath.c_str(), (*pheader)); - s3fscurl.DestroyCurlHandle(); - - if(0 != result){ - // cut "_$folder$" for over checking "no dir object" after here - if(std::string::npos != (Pos = strpath.find("_$folder$", 0))){ - strpath.erase(Pos); - } - } + + // re-get headers + result = head_request(strpath, *pheader); + + // cppcheck-suppress unmatchedSuppression + // cppcheck-suppress knownConditionTrueFalse + if(0 != result){ + // cut "_$folder$" for over checking "no dir object" after here + if(std::string::npos != (Pos = strpath.find("_$folder$", 0))){ + strpath.erase(Pos); + } + } } } if(0 != result && std::string::npos == strpath.find("_$folder$", 0)){ @@ -907,7 +914,6 @@ static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char* int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size) { int result; - S3fsCurl s3fscurl(true); off_t size; std::string strpath; @@ -932,11 +938,16 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz } if(!nocopyapi && !nomultipart && size >= multipart_threshold){ + // [TODO] + // This object will be removed after removing S3fsMultiCurl + // + S3fsCurl s3fscurl(true); if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta))){ return result; } }else{ - if(0 != (result = s3fscurl.PutHeadRequest(strpath.c_str(), meta, is_copy))){ + // send put head request + if(0 != (result = put_head_request(strpath, meta, is_copy))){ return result; } } @@ -1060,8 +1071,11 @@ static int create_file_object(const char* path, mode_t mode, uid_t uid, gid_t gi meta["x-amz-meta-ctime"] = strnow; meta["x-amz-meta-mtime"] = strnow; - S3fsCurl s3fscurl(true); - return s3fscurl.PutRequest(path, meta, -1); // fd=-1 means for creating zero byte object. + int result; + if(0 != (result = put_request(std::string(SAFESTRPTR(path)), meta, -1, true/* ahbe */))){ + return result; + } + return 0; } static int s3fs_mknod(const char *_path, mode_t mode, dev_t rdev) @@ -1188,8 +1202,11 @@ static int create_directory_object(const char* path, mode_t mode, const struct t meta["x-amz-meta-xattr"] = pxattrvalue; } - S3fsCurl s3fscurl; - return s3fscurl.PutRequest(tpath.c_str(), meta, -1); // fd=-1 means for creating zero byte object. + int result; + if(0 != (result = put_request(tpath, meta, -1, false/* ahbe */))){ + return result; + } + return 0; } static int s3fs_mkdir(const char* _path, mode_t mode) @@ -1250,8 +1267,11 @@ static int s3fs_unlink(const char* _path) if(0 != (result = check_parent_object_access(path, W_OK | X_OK))){ return result; } - S3fsCurl s3fscurl; - result = s3fscurl.DeleteRequest(path); + + if(0 != (result = delete_request(std::string(SAFESTRPTR(path))))){ + return result; + } + StatCache::getStatCacheData()->DelStat(path); StatCache::getStatCacheData()->DelSymlink(path); FdManager::DeleteCacheFile(path); @@ -1304,9 +1324,10 @@ static int s3fs_rmdir(const char* _path) if('/' != *strpath.rbegin()){ strpath += "/"; } - S3fsCurl s3fscurl; - result = s3fscurl.DeleteRequest(strpath.c_str()); - s3fscurl.DestroyCurlHandle(); + + // delete request + result = delete_request(strpath); + StatCache::getStatCacheData()->DelStat(strpath); // double check for old version(before 1.63) @@ -1320,8 +1341,9 @@ static int s3fs_rmdir(const char* _path) if(0 == get_object_attribute(strpath.c_str(), &stbuf, nullptr, false)){ if(S_ISDIR(stbuf.st_mode)){ // Found "dir" object. - result = s3fscurl.DeleteRequest(strpath.c_str()); - s3fscurl.DestroyCurlHandle(); + + // delete request + result = delete_request(strpath); StatCache::getStatCacheData()->DelStat(strpath); } } @@ -1332,7 +1354,9 @@ static int s3fs_rmdir(const char* _path) // This processing is necessary for other S3 clients compatibility. if(is_special_name_folder_object(strpath.c_str())){ strpath += "_$folder$"; - result = s3fscurl.DeleteRequest(strpath.c_str()); + + // delete request + result = delete_request(strpath); } // update parent directory timestamp @@ -1598,6 +1622,9 @@ static int rename_large_object(const char* from, const char* to) return result; } + // [TODO] + // This object will be removed after removing S3fsMultiCurl + // S3fsCurl s3fscurl(true); if(0 != (result = s3fscurl.MultipartRenameRequest(from, to, meta, buf.st_size))){ return result; @@ -3121,6 +3148,9 @@ static int s3fs_opendir(const char* _path, struct fuse_file_info* fi) return result; } +// [TODO] +// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl +// // cppcheck-suppress unmatchedSuppression // cppcheck-suppress constParameterCallback static bool multi_head_callback(S3fsCurl* s3fscurl, void* param) @@ -3163,6 +3193,9 @@ struct multi_head_notfound_callback_param s3obj_list_t notfound_list; }; +// [TODO] +// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl +// static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param) { if(!s3fscurl){ @@ -3184,6 +3217,9 @@ static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param) return true; } +// [TODO] +// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl +// static std::unique_ptr multi_head_retry_callback(S3fsCurl* s3fscurl) { if(!s3fscurl){ @@ -3220,6 +3256,9 @@ static std::unique_ptr multi_head_retry_callback(S3fsCurl* s3fscurl) static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler) { + // [TODO] + // This will be checked and changed after removing S3fsMultiCurl + // S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest(), true); // [NOTE] run all requests to completion even if some requests fail. s3obj_list_t headlist; int result = 0; @@ -3393,7 +3432,6 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, std::string next_continuation_token; std::string next_marker; bool truncated = true; - S3fsCurl s3fscurl; S3FS_PRN_INFO1("[path=%s]", path); @@ -3420,10 +3458,13 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, } while(truncated){ - // append parameters to query in alphabetical order + int result; std::string each_query; + std::string responseBody; + + // append parameters to query in alphabetical order if(!next_continuation_token.empty()){ - each_query += "continuation-token=" + urlEncodePath(next_continuation_token) + "&"; + each_query += "continuation-token=" + urlEncodePath(next_continuation_token) + "&"; next_continuation_token = ""; } each_query += query_delimiter; @@ -3437,20 +3478,17 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, each_query += query_maxkey; each_query += query_prefix; - // request - int result; - if(0 != (result = s3fscurl.ListBucketRequest(path, each_query.c_str()))){ - S3FS_PRN_ERR("ListBucketRequest returns with error."); + // send request + if(0 != (result = list_bucket_request(std::string(SAFESTRPTR(path)), each_query, responseBody))){ return result; } - const std::string& body = s3fscurl.GetBodyData(); // [NOTE] // CR code(\r) is replaced with LF(\n) by xmlReadMemory() function. // To prevent that, only CR code is encoded by following function. // The encoded CR code is decoded with append_objects_from_xml(_ex). // - std::string encbody = get_encoded_cr_code(body.c_str()); + std::string encbody = get_encoded_cr_code(responseBody.c_str()); // xmlDocPtr std::unique_ptr doc(xmlReadMemory(encbody.c_str(), static_cast(encbody.size()), "", nullptr, 0), xmlFreeDoc); @@ -3488,9 +3526,6 @@ static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, } } - // reset(initialize) curl object - s3fscurl.DestroyCurlHandle(); - if(check_content_only){ break; } @@ -4211,6 +4246,11 @@ static void* s3fs_init(struct fuse_conn_info* conn) S3FS_PRN_DBG("Could not initialize cache directory."); } + if(!ThreadPoolMan::Initialize(max_thread_count)){ + S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count); + s3fs_exit_fuseloop(EXIT_FAILURE); + } + // check loading IAM role name if(!ps3fscred->LoadIAMRoleFromMetaData()){ S3FS_PRN_CRIT("could not load IAM role name from meta data."); @@ -4238,11 +4278,6 @@ static void* s3fs_init(struct fuse_conn_info* conn) conn->want |= FUSE_CAP_BIG_WRITES; } - if(!ThreadPoolMan::Initialize(max_thread_count)){ - S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count); - s3fs_exit_fuseloop(EXIT_FAILURE); - } - // Signal object if(!S3fsSignals::Initialize()){ S3FS_PRN_ERR("Failed to initialize signal object, but continue..."); @@ -4385,124 +4420,118 @@ static int s3fs_check_service() return EXIT_FAILURE; } - S3fsCurl s3fscurl; - bool force_no_sse = false; - - while(0 > s3fscurl.CheckBucket(get_realpath("/").c_str(), support_compat_dir, force_no_sse)){ - // get response code - bool do_retry = false; - long responseCode = s3fscurl.GetLastResponseCode(); - - // check wrong endpoint, and automatically switch endpoint - if(300 <= responseCode && responseCode < 500){ - - // check region error(for putting message or retrying) - const std::string& body = s3fscurl.GetBodyData(); - std::string expectregion; - std::string expectendpoint; + bool forceNoSSE = false; // it is not mandatory at first. + + for(bool isLoop = true; isLoop; ){ + long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET; + std::string responseBody; + if(0 > check_service_request(get_realpath("/"), forceNoSSE, support_compat_dir, responseCode, responseBody)){ + // check wrong endpoint, and automatically switch endpoint + if(300 <= responseCode && responseCode < 500){ + // check region error(for putting message or retrying) + std::string expectregion; + std::string expectendpoint; + + // Check if any case can be retried + if(check_region_error(responseBody.c_str(), responseBody.size(), expectregion)){ + // [NOTE] + // If endpoint is not specified(using us-east-1 region) and + // an error is encountered accessing a different region, we + // will retry the check on the expected region. + // see) https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html#access-bucket-intro + // + if(s3host != "http://s3.amazonaws.com" && s3host != "https://s3.amazonaws.com"){ + // specified endpoint for specified url is wrong. + if(is_specified_endpoint){ + S3FS_PRN_CRIT("The bucket region is not '%s'(specified) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str()); + }else{ + S3FS_PRN_CRIT("The bucket region is not '%s'(default) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str()); + } + isLoop = false; + + }else if(is_specified_endpoint){ + // specified endpoint is wrong. + S3FS_PRN_CRIT("The bucket region is not '%s'(specified), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); + isLoop = false; + + }else if(S3fsCurl::GetSignatureType() == signature_type_t::V4_ONLY || S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){ + // current endpoint and url are default value, so try to connect to expected region. + S3FS_PRN_CRIT("Failed to connect region '%s'(default), so retry to connect region '%s' for url(http(s)://s3-%s.amazonaws.com).", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); + + // change endpoint + endpoint = expectregion; + + // change url + if(s3host == "http://s3.amazonaws.com"){ + s3host = "http://s3-" + endpoint + ".amazonaws.com"; + }else if(s3host == "https://s3.amazonaws.com"){ + s3host = "https://s3-" + endpoint + ".amazonaws.com"; + } - // Check if any case can be retried - if(check_region_error(body.c_str(), body.size(), expectregion)){ - // [NOTE] - // If endpoint is not specified(using us-east-1 region) and - // an error is encountered accessing a different region, we - // will retry the check on the expected region. - // see) https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html#access-bucket-intro - // - if(s3host != "http://s3.amazonaws.com" && s3host != "https://s3.amazonaws.com"){ - // specified endpoint for specified url is wrong. - if(is_specified_endpoint){ - S3FS_PRN_CRIT("The bucket region is not '%s'(specified) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str()); }else{ - S3FS_PRN_CRIT("The bucket region is not '%s'(default) for specified url(%s), it is correctly '%s'. You should specify url(http(s)://s3-%s.amazonaws.com) and endpoint(%s) option.", endpoint.c_str(), s3host.c_str(), expectregion.c_str(), expectregion.c_str(), expectregion.c_str()); + S3FS_PRN_CRIT("The bucket region is not '%s'(default), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); + isLoop = false; } - }else if(is_specified_endpoint){ - // specified endpoint is wrong. - S3FS_PRN_CRIT("The bucket region is not '%s'(specified), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); - - }else if(S3fsCurl::GetSignatureType() == signature_type_t::V4_ONLY || S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){ - // current endpoint and url are default value, so try to connect to expected region. - S3FS_PRN_CRIT("Failed to connect region '%s'(default), so retry to connect region '%s' for url(http(s)://s3-%s.amazonaws.com).", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); - - // change endpoint - endpoint = expectregion; - - // change url - if(s3host == "http://s3.amazonaws.com"){ - s3host = "http://s3-" + endpoint + ".amazonaws.com"; - }else if(s3host == "https://s3.amazonaws.com"){ - s3host = "https://s3-" + endpoint + ".amazonaws.com"; + }else if(check_endpoint_error(responseBody.c_str(), responseBody.size(), expectendpoint)){ + // redirect error + if(pathrequeststyle){ + S3FS_PRN_CRIT("S3 service returned PermanentRedirect (current is url(%s) and endpoint(%s)). You need to specify correct url(http(s)://s3-.amazonaws.com) and endpoint option with use_path_request_style option.", s3host.c_str(), endpoint.c_str()); + }else{ + S3FS_PRN_CRIT("S3 service returned PermanentRedirect with %s (current is url(%s) and endpoint(%s)). You need to specify correct endpoint option.", expectendpoint.c_str(), s3host.c_str(), endpoint.c_str()); } + return EXIT_FAILURE; - // Retry with changed host - s3fscurl.DestroyCurlHandle(); - do_retry = true; + }else if(check_invalid_sse_arg_error(responseBody.c_str(), responseBody.size())){ + // SSE argument error, so retry it without SSE + S3FS_PRN_CRIT("S3 service returned InvalidArgument(x-amz-server-side-encryption), so retry without adding x-amz-server-side-encryption."); - }else{ - S3FS_PRN_CRIT("The bucket region is not '%s'(default), it is correctly '%s'. You should specify endpoint(%s) option.", endpoint.c_str(), expectregion.c_str(), expectregion.c_str()); - } - - }else if(check_endpoint_error(body.c_str(), body.size(), expectendpoint)){ - // redirect error - if(pathrequeststyle){ - S3FS_PRN_CRIT("S3 service returned PermanentRedirect (current is url(%s) and endpoint(%s)). You need to specify correct url(http(s)://s3-.amazonaws.com) and endpoint option with use_path_request_style option.", s3host.c_str(), endpoint.c_str()); - }else{ - S3FS_PRN_CRIT("S3 service returned PermanentRedirect with %s (current is url(%s) and endpoint(%s)). You need to specify correct endpoint option.", expectendpoint.c_str(), s3host.c_str(), endpoint.c_str()); + // Retry without sse parameters + forceNoSSE = true; } - return EXIT_FAILURE; - - }else if(check_invalid_sse_arg_error(body.c_str(), body.size())){ - // SSE argument error, so retry it without SSE - S3FS_PRN_CRIT("S3 service returned InvalidArgument(x-amz-server-side-encryption), so retry without adding x-amz-server-side-encryption."); - - // Retry without sse parameters - s3fscurl.DestroyCurlHandle(); - do_retry = true; - force_no_sse = true; } - } - // Try changing signature from v4 to v2 - // - // [NOTE] - // If there is no case to retry with the previous checks, and there - // is a chance to retry with signature v2, prepare to retry with v2. - // - if(!do_retry && (responseCode == 400 || responseCode == 403) && S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){ - // switch sigv2 - S3FS_PRN_CRIT("Failed to connect by sigv4, so retry to connect by signature version 2. But you should to review url and endpoint option."); + // Try changing signature from v4 to v2 + // + // [NOTE] + // If there is no case to retry with the previous checks, and there + // is a chance to retry with signature v2, prepare to retry with v2. + // + if(!isLoop && (responseCode == 400 || responseCode == 403) && S3fsCurl::GetSignatureType() == signature_type_t::V2_OR_V4){ + // switch sigv2 + S3FS_PRN_CRIT("Failed to connect by sigv4, so retry to connect by signature version 2. But you should to review url and endpoint option."); - // retry to check with sigv2 - s3fscurl.DestroyCurlHandle(); - do_retry = true; - S3fsCurl::SetSignatureType(signature_type_t::V2_ONLY); - } + // retry to check with sigv2 + isLoop = true; + S3fsCurl::SetSignatureType(signature_type_t::V2_ONLY); + } - // check errors(after retrying) - if(!do_retry && responseCode != 200 && responseCode != 301){ - // parse error message if existed - std::string errMessage; - const std::string& body = s3fscurl.GetBodyData(); - check_error_message(body.c_str(), body.size(), errMessage); - - if(responseCode == 400){ - S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bad Request(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); - }else if(responseCode == 403){ - S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Invalid Credentials(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); - }else if(responseCode == 404){ - if(mount_prefix.empty()){ - S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory not found(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); + // check errors(after retrying) + if(!isLoop && responseCode != 200 && responseCode != 301){ + // parse error message if existed + std::string errMessage; + check_error_message(responseBody.c_str(), responseBody.size(), errMessage); + + if(responseCode == 400){ + S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bad Request(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); + }else if(responseCode == 403){ + S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Invalid Credentials(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); + }else if(responseCode == 404){ + if(mount_prefix.empty()){ + S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory not found(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); + }else{ + S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory(%s) not found(host=%s, message=%s) - You may need to specify the compat_dir option.", mount_prefix.c_str(), s3host.c_str(), errMessage.c_str()); + } }else{ - S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Bucket or directory(%s) not found(host=%s, message=%s) - You may need to specify the compat_dir option.", mount_prefix.c_str(), s3host.c_str(), errMessage.c_str()); + S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Unable to connect(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); } - }else{ - S3FS_PRN_CRIT("Failed to check bucket and directory for mount point : Unable to connect(host=%s, message=%s)", s3host.c_str(), errMessage.c_str()); + return EXIT_FAILURE; } - return EXIT_FAILURE; + }else{ + // break loop + isLoop = false; } } - s3fscurl.DestroyCurlHandle(); // make sure remote mountpath exists and is a directory if(!mount_prefix.empty()){ diff --git a/src/s3fs_cred.cpp b/src/s3fs_cred.cpp index c903322f5e..f26c9f3a84 100644 --- a/src/s3fs_cred.cpp +++ b/src/s3fs_cred.cpp @@ -35,6 +35,8 @@ #include "curl.h" #include "string_util.h" #include "metaheader.h" +#include "threadpoolman.h" +#include "s3fs_threadreqs.h" //------------------------------------------------------------------- // Symbols @@ -290,7 +292,7 @@ bool S3fsCred::SetIAMRole(const char* role) return true; } -const std::string& S3fsCred::GetIAMRole() const +const std::string& S3fsCred::GetIAMRoleHasLock() const { return IAM_role; } @@ -336,7 +338,7 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role) S3FS_PRN_ERR("IAM role name is empty."); return false; } - S3FS_PRN_INFO3("[IAM role=%s]", GetIAMRole().c_str()); + S3FS_PRN_INFO3("[IAM role=%s]", GetIAMRoleHasLock().c_str()); } if(is_ecs){ @@ -356,15 +358,15 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role) // in the S3fsCurl::GetIAMv2ApiToken method (when retrying). // if(GetIMDSVersion() > 1){ - S3fsCurl s3fscurl; std::string token; - int result = s3fscurl.GetIAMv2ApiToken(S3fsCred::IAMv2_token_url, S3fsCred::IAMv2_token_ttl, S3fsCred::IAMv2_token_ttl_hdr, token); + int result = get_iamv2api_token_request(std::string(S3fsCred::IAMv2_token_url), S3fsCred::IAMv2_token_ttl, std::string(S3fsCred::IAMv2_token_ttl_hdr), token); + if(-ENOENT == result){ // If we get a 404 back when requesting the token service, // then it's highly likely we're running in an environment // that doesn't support the AWS IMDSv2 API, so we'll skip // the token retrieval in the future. - SetIMDSVersion(1); + SetIMDSVersionHasLock(1); }else if(result != 0){ // If we get an unexpected error when retrieving the API @@ -375,13 +377,13 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role) }else{ // Set token - if(!SetIAMv2APIToken(token)){ + if(!SetIAMv2APITokenHasLock(token)){ S3FS_PRN_ERR("Error storing IMDSv2 API token(%s).", token.c_str()); } } } if(check_iam_role){ - url = IAM_cred_url + GetIAMRole(); + url = IAM_cred_url + GetIAMRoleHasLock(); }else{ url = IAM_cred_url; } @@ -389,7 +391,7 @@ bool S3fsCred::GetIAMCredentialsURL(std::string& url, bool check_iam_role) return true; } -int S3fsCred::SetIMDSVersion(int version) +int S3fsCred::SetIMDSVersionHasLock(int version) { int old = IAM_api_version; IAM_api_version = version; @@ -401,7 +403,7 @@ int S3fsCred::GetIMDSVersion() const return IAM_api_version; } -bool S3fsCred::SetIAMv2APIToken(const std::string& token) +bool S3fsCred::SetIAMv2APITokenHasLock(const std::string& token) { S3FS_PRN_INFO3("Setting AWS IMDSv2 API token to %s", token.c_str()); @@ -427,35 +429,27 @@ const std::string& S3fsCred::GetIAMv2APIToken() const // bool S3fsCred::LoadIAMCredentials() { - // url(check iam role) std::string url; + std::string striamtoken; + std::string stribmsecret; + std::string cred; + // get parameters(check iam role) if(!GetIAMCredentialsURL(url, true)){ return false; } - - const char* iam_v2_token = nullptr; - std::string str_iam_v2_token; if(GetIMDSVersion() > 1){ - str_iam_v2_token = GetIAMv2APIToken(); - iam_v2_token = str_iam_v2_token.c_str(); + striamtoken = GetIAMv2APIToken(); } - - const char* ibm_secret_access_key = nullptr; - std::string str_ibm_secret_access_key; if(IsIBMIAMAuth()){ - str_ibm_secret_access_key = AWSSecretAccessKey; - ibm_secret_access_key = str_ibm_secret_access_key.c_str(); + stribmsecret = AWSSecretAccessKey; } - S3fsCurl s3fscurl; - std::string response; - if(!s3fscurl.GetIAMCredentials(url.c_str(), iam_v2_token, ibm_secret_access_key, response)){ - return false; - } - - if(!SetIAMCredentials(response.c_str())){ - S3FS_PRN_ERR("Something error occurred, could not set IAM role name."); + // Get IAM Credentials + if(0 == get_iamcred_request(url, striamtoken, stribmsecret, cred)){ + S3FS_PRN_DBG("Succeed to set IAM credentials"); + }else{ + S3FS_PRN_ERR("Something error occurred, could not set IAM credentials."); return false; } return true; @@ -466,40 +460,45 @@ bool S3fsCred::LoadIAMCredentials() // bool S3fsCred::LoadIAMRoleFromMetaData() { - const std::lock_guard lock(token_lock); + if(!load_iamrole){ + // nothing to do + return true; + } - if(load_iamrole){ - // url(not check iam role) - std::string url; + std::string url; + std::string iamtoken; + { + const std::lock_guard lock(token_lock); + // url(not check iam role) if(!GetIAMCredentialsURL(url, false)){ return false; } - const char* iam_v2_token = nullptr; - std::string str_iam_v2_token; if(GetIMDSVersion() > 1){ - str_iam_v2_token = GetIAMv2APIToken(); - iam_v2_token = str_iam_v2_token.c_str(); + iamtoken = GetIAMv2APIToken(); } + } - S3fsCurl s3fscurl; - std::string token; - if(!s3fscurl.GetIAMRoleFromMetaData(url.c_str(), iam_v2_token, token)){ - return false; - } + // Get IAM Role token + std::string token; + if(0 != get_iamrole_request(url, iamtoken, token)){ + S3FS_PRN_ERR("failed to get IAM Role token from meta data."); + return false; + } - if(!SetIAMRoleFromMetaData(token.c_str())){ - S3FS_PRN_ERR("Something error occurred, could not set IAM role name."); - return false; - } - S3FS_PRN_INFO("loaded IAM role name = %s", GetIAMRole().c_str()); + // Set + if(!SetIAMRoleFromMetaData(token.c_str())){ + S3FS_PRN_ERR("Something error occurred, could not set IAM role name."); + return false; } return true; } bool S3fsCred::SetIAMCredentials(const char* response) { + const std::lock_guard lock(token_lock); + S3FS_PRN_INFO3("IAM credential response = \"%s\"", response); iamcredmap_t keyval; @@ -530,6 +529,8 @@ bool S3fsCred::SetIAMCredentials(const char* response) bool S3fsCred::SetIAMRoleFromMetaData(const char* response) { + const std::lock_guard lock(token_lock); + S3FS_PRN_INFO3("IAM role name response = \"%s\"", response ? response : "(null)"); std::string rolename; @@ -1372,7 +1373,7 @@ int S3fsCred::DetectParam(const char* arg) SetIAMTokenField("\"access_token\""); SetIAMExpiryField("\"expiration\""); SetIAMFieldCount(2); - SetIMDSVersion(1); + SetIMDSVersionHasLock(1); set_builtin_cred_opts = true; return 0; } @@ -1399,7 +1400,7 @@ int S3fsCred::DetectParam(const char* arg) } if(0 == strcmp(arg, "imdsv1only")){ - SetIMDSVersion(1); + SetIMDSVersionHasLock(1); set_builtin_cred_opts = true; return 0; } @@ -1410,7 +1411,7 @@ int S3fsCred::DetectParam(const char* arg) return -1; } SetIsECS(true); - SetIMDSVersion(1); + SetIMDSVersionHasLock(1); SetIAMCredentialsURL("http://169.254.170.2"); SetIAMFieldCount(5); set_builtin_cred_opts = true; diff --git a/src/s3fs_cred.h b/src/s3fs_cred.h index 88eb8262c0..a14c38c3ad 100644 --- a/src/s3fs_cred.h +++ b/src/s3fs_cred.h @@ -115,14 +115,24 @@ class S3fsCred bool SetIsIBMIAMAuth(bool flag); - int SetIMDSVersion(int version) REQUIRES(S3fsCred::token_lock); + int SetIMDSVersionHasLock(int version) REQUIRES(S3fsCred::token_lock); + int SetIMDSVersion(int version) + { + const std::lock_guard lock(token_lock); + return SetIMDSVersionHasLock(version); + } int GetIMDSVersion() const REQUIRES(S3fsCred::token_lock); - bool SetIAMv2APIToken(const std::string& token) REQUIRES(S3fsCred::token_lock); + bool SetIAMv2APITokenHasLock(const std::string& token) REQUIRES(S3fsCred::token_lock); const std::string& GetIAMv2APIToken() const REQUIRES(S3fsCred::token_lock); bool SetIAMRole(const char* role) REQUIRES(S3fsCred::token_lock); - const std::string& GetIAMRole() const REQUIRES(S3fsCred::token_lock); + const std::string& GetIAMRoleHasLock() const REQUIRES(S3fsCred::token_lock); + const std::string& GetIAMRole() const + { + const std::lock_guard lock(token_lock); + return GetIAMRoleHasLock(); + } bool IsSetIAMRole() const REQUIRES(S3fsCred::token_lock); size_t SetIAMFieldCount(size_t field_count); std::string SetIAMCredentialsURL(const char* url); @@ -142,8 +152,8 @@ class S3fsCred bool GetIAMCredentialsURL(std::string& url, bool check_iam_role) REQUIRES(S3fsCred::token_lock); bool LoadIAMCredentials() REQUIRES(S3fsCred::token_lock); - bool SetIAMCredentials(const char* response) REQUIRES(S3fsCred::token_lock); - bool SetIAMRoleFromMetaData(const char* response) REQUIRES(S3fsCred::token_lock); + bool SetIAMCredentials(const char* response); + bool SetIAMRoleFromMetaData(const char* response); bool SetExtCredLib(const char* arg); bool IsSetExtCredLib() const; diff --git a/src/s3fs_help.cpp b/src/s3fs_help.cpp index 27510dfa38..46ca481fff 100644 --- a/src/s3fs_help.cpp +++ b/src/s3fs_help.cpp @@ -244,7 +244,7 @@ static constexpr char help_string[] = "\n" " parallel_count (default=\"5\")\n" " - number of parallel request for uploading big objects.\n" - " s3fs uploads large object (over 20MB) by multipart post request, \n" + " s3fs uploads large object (over 20MB) by multipart upload request, \n" " and sends parallel requests.\n" " This option limits parallel request count which s3fs requests \n" " at once. It is necessary to set this value depending on a CPU \n" diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp new file mode 100644 index 0000000000..33c2073545 --- /dev/null +++ b/src/s3fs_threadreqs.cpp @@ -0,0 +1,591 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include "common.h" +#include "s3fs_threadreqs.h" +#include "threadpoolman.h" +#include "curl_util.h" +#include "s3fs_logger.h" + +//------------------------------------------------------------------- +// Thread Worker functions for MultiThread Request +//------------------------------------------------------------------- +// +// Thread Worker function for head request +// +void* head_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !pthparam->pmeta){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Head Request [path=%s][pmeta=%p]", pthparam->path.c_str(), pthparam->pmeta); + + S3fsCurl s3fscurl; + pthparam->result = s3fscurl.HeadRequest(pthparam->path.c_str(), *(pthparam->pmeta)); + + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for delete request +// +void* delete_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Delete Request [path=%s]", pthparam->path.c_str()); + + S3fsCurl s3fscurl; + pthparam->result = s3fscurl.DeleteRequest(pthparam->path.c_str()); + + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for put head request +// +void* put_head_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Put Head Request [path=%s][meta count=%lu][is copy=%s]", pthparam->path.c_str(), pthparam->meta.size(), (pthparam->isCopy ? "true" : "false")); + + S3fsCurl s3fscurl(true); + pthparam->result = s3fscurl.PutHeadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->isCopy); + + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for put request +// +void* put_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Put Request [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", pthparam->path.c_str(), pthparam->meta.size(), pthparam->fd, (pthparam->ahbe ? "true" : "false")); + + S3fsCurl s3fscurl(pthparam->ahbe); + pthparam->result = s3fscurl.PutRequest(pthparam->path.c_str(), pthparam->meta, pthparam->fd); + + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for list bucket request +// +void* list_bucket_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !(pthparam->presponseBody)){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("List Bucket Request [path=%s][query=%s]", pthparam->path.c_str(), pthparam->query.c_str()); + + S3fsCurl s3fscurl; + if(0 == (pthparam->result = s3fscurl.ListBucketRequest(pthparam->path.c_str(), pthparam->query.c_str()))){ + *(pthparam->presponseBody) = s3fscurl.GetBodyData(); + } + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for check service request +// +void* check_service_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !(pthparam->presponseCode) || !(pthparam->presponseBody)){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Check Service Request [path=%s][support compat dir=%s][force No SSE=%s]", pthparam->path.c_str(), (pthparam->support_compat_dir ? "true" : "false"), (pthparam->forceNoSSE ? "true" : "false")); + + S3fsCurl s3fscurl; + if(0 == (pthparam->result = s3fscurl.CheckBucket(pthparam->path.c_str(), pthparam->support_compat_dir, pthparam->forceNoSSE))){ + *(pthparam->presponseCode) = s3fscurl.GetLastResponseCode(); + *(pthparam->presponseBody) = s3fscurl.GetBodyData(); + } + return reinterpret_cast(pthparam->result); +} + +// +// Worker function for pre multipart upload request +// +void* pre_multipart_upload_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Pre Multipart Upload Request [path=%s][meta count=%lu]", pthparam->path.c_str(), pthparam->meta.size()); + + S3fsCurl s3fscurl(true); + pthparam->result = s3fscurl.PreMultipartUploadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->upload_id); + + return reinterpret_cast(pthparam->result); +} + +// +// Worker function for complete multipart upload request +// +void* complete_multipart_upload_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Complete Multipart Upload Request [path=%s][upload id=%s][etaglist=%lu]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->etaglist.size()); + + S3fsCurl s3fscurl(true); + pthparam->result = s3fscurl.MultipartUploadComplete(pthparam->path.c_str(), pthparam->upload_id, pthparam->etaglist); + + return reinterpret_cast(pthparam->result); +} + +// +// Worker function for abort multipart upload request +// +void* abort_multipart_upload_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Abort Multipart Upload Request [path=%s][upload id=%s]", pthparam->path.c_str(), pthparam->upload_id.c_str()); + + S3fsCurl s3fscurl(true); + pthparam->result = s3fscurl.AbortMultipartUpload(pthparam->path.c_str(), pthparam->upload_id); + + return reinterpret_cast(pthparam->result); +} + +// +// Thread Worker function for get object request +// +void* get_object_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Get Object Request [path=%s][fd=%d][start=%lld][size=%lld]", pthparam->path.c_str(), pthparam->fd, static_cast(pthparam->start), static_cast(pthparam->size)); + + sse_type_t ssetype = sse_type_t::SSE_DISABLE; + std::string ssevalue; + if(!get_object_sse_type(pthparam->path.c_str(), ssetype, ssevalue)){ + S3FS_PRN_WARN("Failed to get SSE type for file(%s).", pthparam->path.c_str()); + } + + S3fsCurl s3fscurl; + pthparam->result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, ssetype, ssevalue); + + return reinterpret_cast(pthparam->result); +} + +//------------------------------------------------------------------- +// Utility functions +//------------------------------------------------------------------- +// +// Calls S3fsCurl::HeadRequest via head_req_threadworker +// +int head_request(const std::string& strpath, headers_t& header) +{ + // parameter for thread worker + head_req_thparam thargs; + thargs.path = strpath; + thargs.pmeta = &header; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = head_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Head Request Thread Worker [path=%s]", strpath.c_str()); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_DBG("Await Head Request by error(%d) [path=%s]", thargs.result, strpath.c_str()); + return thargs.result; + } + + return 0; +} + +// +// Calls S3fsCurl::DeleteRequest via delete_req_threadworker +// +int delete_request(const std::string& strpath) +{ + // parameter for thread worker + delete_req_thparam thargs; + thargs.path = strpath; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = delete_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Delete Request Thread Worker [path=%s]", strpath.c_str()); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_DBG("Await Delete Request by error(%d) [path=%s]", thargs.result, strpath.c_str()); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::PutHeadRequest via put_head_req_threadworker +// +int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy) +{ + // parameter for thread worker + put_head_req_thparam thargs; + thargs.path = strpath; + thargs.meta = meta; // copy + thargs.isCopy = is_copy; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_head_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Put Head Request Thread Worker [path=%s][meta count=%lu][is copy=%s]", strpath.c_str(), meta.size(), (is_copy ? "true" : "false")); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Await Put Head Request by error(%d) [path=%s][meta count=%lu][is copy=%s]", thargs.result, strpath.c_str(), meta.size(), (is_copy ? "true" : "false")); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::PutRequest via put_req_threadworker +// +int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe) +{ + // parameter for thread worker + put_req_thparam thargs; + thargs.path = strpath; + thargs.meta = meta; // copy + thargs.fd = fd; // fd=-1 means for creating zero byte object. + thargs.ahbe = ahbe; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = put_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Put Request Thread Worker [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", strpath.c_str(), meta.size(), fd, (ahbe ? "true" : "false")); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Await Put Request by error(%d) [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", thargs.result, strpath.c_str(), meta.size(), fd, (ahbe ? "true" : "false")); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::ListBucketRequest via list_bucket_req_threadworker +// +int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody) +{ + // parameter for thread worker + list_bucket_req_thparam thargs; + thargs.path = strpath; + thargs.query = query; + thargs.presponseBody = &responseBody; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = list_bucket_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await List Bucket Request Thread Worker [path=%s][query=%s]", strpath.c_str(), query.c_str()); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Await List Bucket Request by error(%d) [path=%s][query=%s]", thargs.result, strpath.c_str(), query.c_str()); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::CheckBucket via check_service_req_threadworker +// +int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody) +{ + // parameter for thread worker + check_service_req_thparam thargs; + thargs.path = strpath; + thargs.forceNoSSE = forceNoSSE; + thargs.support_compat_dir = support_compat_dir; + thargs.presponseCode = &responseCode; + thargs.presponseBody = &responseBody; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = check_service_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Check Service Request Thread Worker [path=%s][support compat dir=%s][force No SSE=%s]", strpath.c_str(), (support_compat_dir ? "true" : "false"), (forceNoSSE ? "true" : "false")); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Await Check Service Request by error(%d) [path=%s][support compat dir=%s][force No SSE=%s]", thargs.result, strpath.c_str(), (support_compat_dir ? "true" : "false"), (forceNoSSE ? "true" : "false")); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_req_threadworker +// +// [NOTE] +// If the request is successful, sets upload_id. +// +int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id) +{ + // parameter for thread worker + pre_multipart_upload_req_thparam thargs; + thargs.path = path; + thargs.meta = meta; // copy + thargs.upload_id.clear(); // clear + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = pre_multipart_upload_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Pre Multipart Upload Request Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Pre Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result); + return thargs.result; + } + // set upload_id + upload_id = thargs.upload_id; + + return 0; +} + +// +// Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker +// +int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts) +{ + // parameter for thread worker + complete_multipart_upload_req_thparam thargs; + thargs.path = path; + thargs.upload_id = upload_id; + thargs.etaglist = parts; // copy + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = complete_multipart_upload_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Complete Multipart Upload Request Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Complete Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::AbortMultipartUpload via abort_multipart_upload_req_threadworker +// +int abort_multipart_upload_request(const std::string& path, const std::string& upload_id) +{ + // parameter for thread worker + abort_multipart_upload_req_thparam thargs; + thargs.path = path; + thargs.upload_id = upload_id; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = abort_multipart_upload_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Abort Multipart Upload Request Thread Worker"); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Abort Multipart Upload Request(path=%s) returns with error(%d)", path.c_str(), thargs.result); + return thargs.result; + } + return 0; +} + +// +// Calls S3fsCurl::GetObjectRequest via get_object_req_threadworker +// +int get_object_request(const std::string& path, int fd, off_t start, off_t size) +{ + // parameter for thread worker + get_object_req_thparam thargs; + thargs.path = path; + thargs.fd = fd; + thargs.start = start; + thargs.size = size; + thargs.result = 0; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = &thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = get_object_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Get Object Request Thread Worker [path=%s][fd=%d][start=%lld][size=%lld]", path.c_str(), fd, static_cast(start), static_cast(size)); + return -EIO; + } + if(0 != thargs.result){ + S3FS_PRN_ERR("Await Get Object Request by error(%d) [path=%s][fd=%d][start=%lld][size=%lld]", thargs.result, path.c_str(), fd, static_cast(start), static_cast(size)); + return thargs.result; + } + return 0; +} + +//------------------------------------------------------------------- +// Direct Call Utility Functions +//------------------------------------------------------------------- +// These functions (mainly IAM token-related) are not called from +// a thread. +// +// [NOTE] +// The request for IAM token calls are called from S3fsCurl::RequestPerform +// method if the IAM token needs to be updated during each request +// processing. (NOTE: Each request is already executed in a thread.) +// If the number of threads has reached the limit when these functions +// are called, they will block until a thread that can execute this +// process is found. +// This may result in all processing being blocked. +// Therefore, the following functions(IAM token requests) will not be +// processed by a thread worker, but will process the request directly. +// +// If it is a different request called from within a thread worker, +// please process it like this. +// + +// +// Directly calls S3fsCurl::GetIAMv2ApiToken +// +int get_iamv2api_token_request(const std::string& strurl, int tokenttl, const std::string& strttlhdr, std::string& token) +{ + S3FS_PRN_INFO3("Get IAMv2 API Toekn Request directly [url=%s][token ttl=%d][ttl header=%s]", strurl.c_str(), tokenttl, strttlhdr.c_str()); + + S3fsCurl s3fscurl; + + return s3fscurl.GetIAMv2ApiToken(strurl.c_str(), tokenttl, strttlhdr.c_str(), token); +} + +// +// Directly calls S3fsCurl::GetIAMRoleFromMetaData +// +int get_iamrole_request(const std::string& strurl, const std::string& striamtoken, std::string& token) +{ + S3FS_PRN_INFO3("Get IAM Role Request directly [url=%s][iam token=%s]", strurl.c_str(), striamtoken.c_str()); + + S3fsCurl s3fscurl; + int result = 0; + if(!s3fscurl.GetIAMRoleFromMetaData(strurl.c_str(), (striamtoken.empty() ? nullptr : striamtoken.c_str()), token)){ + S3FS_PRN_ERR("Something error occurred during getting IAM Role from MetaData."); + result = -EIO; + } + return result; +} + +// +// Directly calls S3fsCurl::GetIAMCredentials +// +int get_iamcred_request(const std::string& strurl, const std::string& striamtoken, const std::string& stribmsecret, std::string& cred) +{ + S3FS_PRN_INFO3("Get IAM Credentials Request directly [url=%s][iam token=%s][ibm secrect access key=%s]", strurl.c_str(), striamtoken.c_str(), stribmsecret.c_str()); + + S3fsCurl s3fscurl; + int result = 0; + if(!s3fscurl.GetIAMCredentials(strurl.c_str(), (striamtoken.empty() ? nullptr : striamtoken.c_str()), (stribmsecret.empty() ? nullptr : stribmsecret.c_str()), cred)){ + S3FS_PRN_ERR("Something error occurred during getting IAM Credentials."); + result = -EIO; + } + return result; +} + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h new file mode 100644 index 0000000000..50e5005ba2 --- /dev/null +++ b/src/s3fs_threadreqs.h @@ -0,0 +1,187 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef S3FS_THREADREQS_H_ +#define S3FS_THREADREQS_H_ + +#include + +#include "common.h" +#include "metaheader.h" +#include "curl.h" + +//------------------------------------------------------------------- +// Structures for MultiThread Request +//------------------------------------------------------------------- +// +// Head Request parameter structure for Thread Pool. +// +struct head_req_thparam +{ + std::string path; + headers_t* pmeta = nullptr; + int result = 0; +}; + +// +// Delete Request parameter structure for Thread Pool. +// +struct delete_req_thparam +{ + std::string path; + int result = 0; +}; + +// +// Put Head Request parameter structure for Thread Pool. +// +struct put_head_req_thparam +{ + std::string path; + headers_t meta; + bool isCopy = false; + int result = 0; +}; + +// +// Put Request parameter structure for Thread Pool. +// +struct put_req_thparam +{ + std::string path; + headers_t meta; + int fd = -1; + bool ahbe = false; + int result = 0; +}; + +// +// List Bucket Request parameter structure for Thread Pool. +// +struct list_bucket_req_thparam +{ + std::string path; + std::string query; + std::string* presponseBody = nullptr; + int result = 0; +}; + +// +// Check Service Request parameter structure for Thread Pool. +// +struct check_service_req_thparam +{ + std::string path; + bool forceNoSSE = false; + bool support_compat_dir = false; + long* presponseCode = nullptr; + std::string* presponseBody = nullptr; + int result = 0; +}; + +// +// Pre Multipart Upload Request parameter structure for Thread Pool. +// +struct pre_multipart_upload_req_thparam +{ + std::string path; + headers_t meta; + std::string upload_id; + int result = 0; +}; + +// +// Complete Multipart Upload Request parameter structure for Thread Pool. +// +struct complete_multipart_upload_req_thparam +{ + std::string path; + std::string upload_id; + etaglist_t etaglist; + int result = 0; +}; + +// +// Abort Multipart Upload Request parameter structure for Thread Pool. +// +struct abort_multipart_upload_req_thparam +{ + std::string path; + std::string upload_id; + int result = 0; +}; + +// +// Get Object Request parameter structure for Thread Pool. +// +struct get_object_req_thparam +{ + std::string path; + int fd = -1; + off_t start = 0; + off_t size = 0; + int result = 0; +}; + +//------------------------------------------------------------------- +// Thread Worker functions for MultiThread Request +//------------------------------------------------------------------- +void* head_req_threadworker(void* arg); +void* delete_req_threadworker(void* arg); +void* put_head_req_threadworker(void* arg); +void* put_req_threadworker(void* arg); +void* list_bucket_req_threadworker(void* arg); +void* check_service_req_threadworker(void* arg); +void* pre_multipart_upload_req_threadworker(void* arg); +void* complete_multipart_upload_threadworker(void* arg); +void* abort_multipart_upload_req_threadworker(void* arg); +void* get_object_req_threadworker(void* arg); + +//------------------------------------------------------------------- +// Utility functions +//------------------------------------------------------------------- +int head_request(const std::string& strpath, headers_t& header); +int delete_request(const std::string& strpath); +int put_head_request(const std::string& strpath, const headers_t& meta, bool is_copy); +int put_request(const std::string& strpath, const headers_t& meta, int fd, bool ahbe); +int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody); +int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody); +int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id); +int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts); +int abort_multipart_upload_request(const std::string& path, const std::string& upload_id); +int get_object_request(const std::string& path, int fd, off_t start, off_t size); + +//------------------------------------------------------------------- +// Direct Call Utility Functions +//------------------------------------------------------------------- +int get_iamv2api_token_request(const std::string& strurl, int tokenttl, const std::string& strttlhdr, std::string& token); +int get_iamrole_request(const std::string& strurl, const std::string& striamtoken, std::string& token); +int get_iamcred_request(const std::string& strurl, const std::string& striamtoken, const std::string& stribmsecret, std::string& cred); + +#endif // S3FS_THREADREQS_H_ + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/threadpoolman.cpp b/src/threadpoolman.cpp index ccf140e840..df0b2be383 100644 --- a/src/threadpoolman.cpp +++ b/src/threadpoolman.cpp @@ -58,10 +58,41 @@ bool ThreadPoolMan::Instruct(const thpoolman_param& param) S3FS_PRN_WARN("The singleton object is not initialized yet."); return false; } + if(!param.psem){ + S3FS_PRN_ERR("Thread parameter Semaphore is null."); + return false; + } ThreadPoolMan::singleton->SetInstruction(param); return true; } +bool ThreadPoolMan::AwaitInstruct(const thpoolman_param& param) +{ + if(!ThreadPoolMan::singleton){ + S3FS_PRN_WARN("The singleton object is not initialized yet."); + return false; + } + if(param.psem){ + S3FS_PRN_ERR("Thread parameter Semaphore must be null."); + return false; + } + + // Setup local thpoolman_param structure with local Semaphore + thpoolman_param local_param; + Semaphore await_sem(0); + local_param.args = param.args; + local_param.psem = &await_sem; + local_param.pfunc = param.pfunc; + + // Set parameters and run thread worker + ThreadPoolMan::singleton->SetInstruction(local_param); + + // wait until the thread is complete + await_sem.acquire(); + + return true; +} + // // Thread worker // diff --git a/src/threadpoolman.h b/src/threadpoolman.h index 3043474744..22e4d6e969 100644 --- a/src/threadpoolman.h +++ b/src/threadpoolman.h @@ -92,6 +92,7 @@ class ThreadPoolMan static bool Initialize(int count); static void Destroy(); static bool Instruct(const thpoolman_param& pparam); + static bool AwaitInstruct(const thpoolman_param& param); }; #endif // S3FS_THREADPOOLMAN_H_