diff --git a/src/Makefile.am b/src/Makefile.am index 9780b83018..fc2d56a20b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -36,7 +36,6 @@ s3fs_SOURCES = \ metaheader.cpp \ mpu_util.cpp \ curl.cpp \ - curl_multi.cpp \ curl_util.cpp \ s3objlist.cpp \ cache.cpp \ diff --git a/src/curl.cpp b/src/curl.cpp index e3ad4cbc7e..f360d11073 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -35,7 +35,6 @@ #include "s3fs.h" #include "s3fs_logger.h" #include "curl.h" -#include "curl_multi.h" #include "curl_util.h" #include "s3fs_auth.h" #include "s3fs_cred.h" @@ -1221,132 +1220,6 @@ bool S3fsCurl::SetIPResolveType(const char* value) return true; } -// cppcheck-suppress unmatchedSuppression -// cppcheck-suppress constParameter -// cppcheck-suppress constParameterCallback -bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param) -{ - if(!s3fscurl || param){ // this callback does not need a parameter - return false; - } - return s3fscurl->MultipartUploadPartComplete(); -} - -// cppcheck-suppress unmatchedSuppression -// cppcheck-suppress constParameter -// cppcheck-suppress constParameterCallback -bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param) -{ - if(!s3fscurl || param){ // this callback does not need a parameter - return false; - } - return s3fscurl->MultipartUploadPartComplete(); -} - -std::unique_ptr S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl) -{ - if(!s3fscurl){ - return nullptr; - } - // parse and get part_num, upload_id. - std::string upload_id; - std::string part_num_str; - int part_num; - off_t tmp_part_num = 0; - if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){ - return nullptr; - } - upload_id = urlDecode(upload_id); // decode - if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){ - return nullptr; - } - if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){ - return nullptr; - } - part_num = static_cast(tmp_part_num); - - if(s3fscurl->retry_count >= S3fsCurl::retries){ - S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num); - return nullptr; - } - - // duplicate request - std::unique_ptr newcurl(new S3fsCurl(s3fscurl->IsUseAhbe())); - newcurl->partdata.petag = s3fscurl->partdata.petag; - newcurl->partdata.fd = s3fscurl->partdata.fd; - newcurl->partdata.startpos = s3fscurl->b_partdata_startpos; - newcurl->partdata.size = s3fscurl->b_partdata_size; - newcurl->b_partdata_startpos = s3fscurl->b_partdata_startpos; - newcurl->b_partdata_size = s3fscurl->b_partdata_size; - newcurl->retry_count = s3fscurl->retry_count + 1; - newcurl->op = s3fscurl->op; - newcurl->type = s3fscurl->type; - - // setup new curl object - if(0 != newcurl->MultipartUploadContentPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){ - S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); - return nullptr; - } - return newcurl; -} - -std::unique_ptr S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl) -{ - if(!s3fscurl){ - return nullptr; - } - // parse and get part_num, upload_id. - std::string upload_id; - std::string part_num_str; - int part_num; - off_t tmp_part_num = 0; - if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){ - return nullptr; - } - upload_id = urlDecode(upload_id); // decode - if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){ - return nullptr; - } - if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){ - return nullptr; - } - part_num = static_cast(tmp_part_num); - - if(s3fscurl->retry_count >= S3fsCurl::retries){ - S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num); - return nullptr; - } - - // duplicate request - std::unique_ptr newcurl(new S3fsCurl(s3fscurl->IsUseAhbe())); - newcurl->partdata.petag = s3fscurl->partdata.petag; - newcurl->b_from = s3fscurl->b_from; - newcurl->b_meta = s3fscurl->b_meta; - newcurl->retry_count = s3fscurl->retry_count + 1; - newcurl->op = s3fscurl->op; - newcurl->type = s3fscurl->type; - - // setup new curl object - if(0 != newcurl->MultipartUploadCopyPartSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){ - S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); - return nullptr; - } - return newcurl; -} - -std::unique_ptr S3fsCurl::MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl) -{ - if(!s3fscurl){ - return nullptr; - } - - if(-1 == s3fscurl->partdata.fd){ - return S3fsCurl::CopyMultipartUploadRetryCallback(s3fscurl); - }else{ - return S3fsCurl::MultipartUploadPartRetryCallback(s3fscurl); - } -} - int S3fsCurl::MapPutErrorResponse(int result) { if(result != 0){ @@ -1374,185 +1247,6 @@ int S3fsCurl::MapPutErrorResponse(int result) return result; } -int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd) -{ - int result; - std::string upload_id; - struct stat st; - etaglist_t list; - off_t remaining_bytes; - - S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd); - - if(-1 == fstat(fd, &st)){ - S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno); - return -errno; - } - - if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){ - return result; - } - - // Initialize S3fsMultiCurl - S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::MultipartUploadPartCallback); - curlmulti.SetRetryCallback(S3fsCurl::MultipartUploadPartRetryCallback); - - // cycle through open fd, pulling off 10MB chunks at a time - for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){ - off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes; - - // s3fscurl sub object - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - s3fscurl_para->partdata.fd = fd; - s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes; - s3fscurl_para->partdata.size = chunk; - s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos; - s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size; - s3fscurl_para->partdata.add_etag_list(list); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - return -EIO; - } - remaining_bytes -= chunk; - } - - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - int result2; - if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ - S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); - } - return result; - } - - if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){ - return result; - } - return 0; -} - -int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages) -{ - int result; - std::string upload_id; - struct stat st; - etaglist_t list; - - S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd); - - if(-1 == fstat(fd, &st)){ - S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno); - return -errno; - } - - if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){ - return result; - } - - // for copy multipart - std::string srcresource; - std::string srcurl; - MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl); - meta["Content-Type"] = S3fsCurl::LookupMimeType(tpath); - meta["x-amz-copy-source"] = srcresource; - - // Initialize S3fsMultiCurl - S3fsMultiCurl curlmulti(GetMaxParallelCount()); - curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartUploadCallback); - curlmulti.SetRetryCallback(S3fsCurl::MixMultipartUploadRetryCallback); - - for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){ - if(iter->modified){ - // Multipart upload - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - s3fscurl_para->partdata.fd = fd; - s3fscurl_para->partdata.startpos = iter->offset; - s3fscurl_para->partdata.size = iter->bytes; - s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos; - s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size; - s3fscurl_para->partdata.add_etag_list(list); - - S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset), static_cast(iter->bytes), s3fscurl_para->partdata.get_part_number()); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - return -EIO; - } - }else{ - // Multipart copy - for(off_t i = 0, bytes = 0; i < iter->bytes; i += bytes){ - std::unique_ptr s3fscurl_para(new S3fsCurl(true)); - - bytes = std::min(GetMultipartCopySize(), iter->bytes - i); - /* every part should be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB */ - off_t remain_bytes = iter->bytes - i - bytes; - - if ((MIN_MULTIPART_SIZE > remain_bytes) && (0 < remain_bytes)){ - if(FIVE_GB < (bytes + remain_bytes)){ - bytes = (bytes + remain_bytes)/2; - } else{ - bytes += remain_bytes; - } - } - - std::ostringstream strrange; - strrange << "bytes=" << (iter->offset + i) << "-" << (iter->offset + i + bytes - 1); - meta["x-amz-copy-source-range"] = strrange.str(); - - s3fscurl_para->b_from = SAFESTRPTR(tpath); - s3fscurl_para->b_meta = meta; - s3fscurl_para->partdata.add_etag_list(list); - - S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset + i), static_cast(bytes), s3fscurl_para->partdata.get_part_number()); - - // initiate upload part for parallel - if(0 != (result = s3fscurl_para->MultipartUploadCopyPartSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return result; - } - - // set into parallel object - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){ - S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath); - return -EIO; - } - } - } - } - - // Multi request - if(0 != (result = curlmulti.Request())){ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - int result2; - if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){ - S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2); - } - return result; - } - - if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){ - return result; - } - return 0; -} - bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl) { if(!s3fscurl){ @@ -1796,8 +1490,7 @@ S3fsCurl::S3fsCurl(bool ahbe) : type(REQTYPE::UNSET), requestHeaders(nullptr), LastResponseCode(S3FSCURL_RESPONSECODE_NOTSET), postdata(nullptr), postdata_remaining(0), is_use_ahbe(ahbe), retry_count(0), b_postdata(nullptr), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0), - b_ssekey_pos(-1), b_ssetype(sse_type_t::SSE_DISABLE), - sem(nullptr), completed_tids_lock(nullptr), completed_tids(nullptr), fpLazySetup(nullptr), curlCode(CURLE_OK) + fpLazySetup(nullptr), curlCode(CURLE_OK) { if(!S3fsCurl::ps3fscred){ S3FS_PRN_CRIT("The object of S3fs Credential class is not initialized."); @@ -2080,7 +1773,7 @@ bool S3fsCurl::RemakeHandle() headdata.clear(); LastResponseCode = S3FSCURL_RESPONSECODE_NOTSET; - // count up(only use for multipart) + // retry count up retry_count++; // set from backup @@ -3196,7 +2889,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, size_t ssekey_pos) return false; } } - b_ssekey_pos = ssekey_pos; op = "HEAD"; type = REQTYPE::HEAD; @@ -3539,9 +3231,6 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t partdata.size = size; b_partdata_startpos = start; b_partdata_size = size; - b_ssetype = ssetype; - b_ssevalue = ssevalue; - b_ssekey_pos = -1; // not use this value for get object. return 0; } @@ -3855,8 +3544,6 @@ int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t s strrange << "bytes=" << start << "-" << (start + size - 1); meta["x-amz-copy-source-range"] = strrange.str(); - b_from = SAFESTRPTR(tpath); - b_meta = meta; partdata.set_etag(petag); // [NOTE] be careful, the value is set directly S3FS_PRN_INFO3("Copy Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); diff --git a/src/curl.h b/src/curl.h index 0e6e87bc46..f499d48cfb 100644 --- a/src/curl.h +++ b/src/curl.h @@ -34,7 +34,6 @@ #include "common.h" #include "fdcache_page.h" #include "metaheader.h" -#include "psemaphore.h" #include "s3fs_util.h" #include "types.h" @@ -97,10 +96,6 @@ typedef std::vector sseckeylist_t; // class S3fsCurl { - // [TODO] - // If S3fsMultiCurl is discontinued, the following friends will be deleted. - friend class S3fsMultiCurl; - private: enum class REQTYPE : int8_t { UNSET = -1, @@ -187,22 +182,14 @@ class S3fsCurl off_t postdata_remaining; // use by post method and read callback function. filepart partdata; // use by multipart upload/get object callback bool is_use_ahbe; // additional header by extension - int retry_count; // retry count for multipart ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) + int retry_count; // retry count, this is used only sleep time before retring std::unique_ptr b_infile = {nullptr, &s3fs_fclose}; // backup for retrying const unsigned char* b_postdata; // backup for retrying off_t b_postdata_remaining; // backup for retrying off_t b_partdata_startpos; // backup for retrying off_t b_partdata_size; // backup for retrying - size_t b_ssekey_pos; // backup for retrying - std::string b_ssevalue; // backup for retrying - sse_type_t b_ssetype; // backup for retrying - std::string b_from; // backup for retrying(for copy request) ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) - headers_t b_meta; // backup for retrying(for copy request) std::string op; // the HTTP verb of the request ("PUT", "GET", etc.) std::string query_string; // request query string - Semaphore *sem; - std::mutex *completed_tids_lock; // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) - std::vector *completed_tids PT_GUARDED_BY(*completed_tids_lock); // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function CURLcode curlCode; // handle curl return @@ -240,12 +227,6 @@ class S3fsCurl static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp); - static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param); - static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param); - static std::unique_ptr MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl); - static std::unique_ptr CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl); - static std::unique_ptr MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl); - // lazy functions for set curl options static bool MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl); static bool CopyMultipartUploadSetCurlOpts(S3fsCurl* s3fscurl); @@ -289,8 +270,6 @@ class S3fsCurl static bool InitCredentialObject(S3fsCred* pcredobj); static bool InitMimeType(const std::string& strFile); static bool DestroyS3fsCurl(); - static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd); - static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages); // class methods(variables) static std::string LookupMimeType(const std::string& name); diff --git a/src/curl_multi.cpp b/src/curl_multi.cpp deleted file mode 100644 index 1747c7eb6f..0000000000 --- a/src/curl_multi.cpp +++ /dev/null @@ -1,371 +0,0 @@ -/* - * s3fs - FUSE-based file system backed by Amazon S3 - * - * Copyright(C) 2007 Randy Rizun - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "s3fs.h" -#include "s3fs_logger.h" -#include "curl_multi.h" -#include "curl.h" -#include "psemaphore.h" - -//------------------------------------------------------------------- -// Class S3fsMultiCurl -//------------------------------------------------------------------- -S3fsMultiCurl::S3fsMultiCurl(int maxParallelism, bool not_abort) : maxParallelism(maxParallelism), not_abort(not_abort), SuccessCallback(nullptr), NotFoundCallback(nullptr), RetryCallback(nullptr), pSuccessCallbackParam(nullptr), pNotFoundCallbackParam(nullptr) -{ -} - -S3fsMultiCurl::~S3fsMultiCurl() -{ - Clear(); -} - -bool S3fsMultiCurl::ClearEx(bool is_all) -{ - s3fscurllist_t::const_iterator iter; - for(iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter){ - S3fsCurl* s3fscurl = iter->get(); - if(s3fscurl){ - s3fscurl->DestroyCurlHandle(); - } - } - clist_req.clear(); - - if(is_all){ - for(iter = clist_all.cbegin(); iter != clist_all.cend(); ++iter){ - S3fsCurl* s3fscurl = iter->get(); - s3fscurl->DestroyCurlHandle(); - } - clist_all.clear(); - } - - S3FS_MALLOCTRIM(0); - - return true; -} - -S3fsMultiSuccessCallback S3fsMultiCurl::SetSuccessCallback(S3fsMultiSuccessCallback function) -{ - S3fsMultiSuccessCallback old = SuccessCallback; - SuccessCallback = function; - return old; -} - -S3fsMultiNotFoundCallback S3fsMultiCurl::SetNotFoundCallback(S3fsMultiNotFoundCallback function) -{ - S3fsMultiNotFoundCallback old = NotFoundCallback; - NotFoundCallback = function; - return old; -} - -S3fsMultiRetryCallback S3fsMultiCurl::SetRetryCallback(S3fsMultiRetryCallback function) -{ - S3fsMultiRetryCallback old = RetryCallback; - RetryCallback = function; - return old; -} - -void* S3fsMultiCurl::SetSuccessCallbackParam(void* param) -{ - void* old = pSuccessCallbackParam; - pSuccessCallbackParam = param; - return old; -} - -void* S3fsMultiCurl::SetNotFoundCallbackParam(void* param) -{ - void* old = pNotFoundCallbackParam; - pNotFoundCallbackParam = param; - return old; -} - -bool S3fsMultiCurl::SetS3fsCurlObject(std::unique_ptr s3fscurl) -{ - if(!s3fscurl){ - return false; - } - clist_all.push_back(std::move(s3fscurl)); - - return true; -} - -int S3fsMultiCurl::MultiPerform() -{ - std::map>> threads; - int result = 0; - bool isMultiHead = false; - int semCount = GetMaxParallelism(); - Semaphore sem(semCount); - - for(auto iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter) { - S3fsCurl* s3fscurl = iter->get(); - if(!s3fscurl){ - continue; - } - - sem.acquire(); - - { - const std::lock_guard lock(completed_tids_lock); - for(const auto &thread_id : completed_tids){ - auto it = threads.find(thread_id); - it->second.first.join(); - long int int_retval = it->second.second.get(); - if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { - S3FS_PRN_WARN("thread terminated with non-zero return code: %ld", int_retval); - } - threads.erase(it); - } - completed_tids.clear(); - } - s3fscurl->sem = &sem; - s3fscurl->completed_tids_lock = &completed_tids_lock; - s3fscurl->completed_tids = &completed_tids; - - isMultiHead |= s3fscurl->GetOp() == "HEAD"; - - std::promise promise; - std::future future = promise.get_future(); - std::thread thread(S3fsMultiCurl::RequestPerformWrapper, s3fscurl, std::move(promise)); - auto thread_id = thread.get_id(); - threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future))); - } - - for(int i = 0; i < semCount; ++i){ - sem.acquire(); - } - - const std::lock_guard lock(completed_tids_lock); - for(const auto &thread_id : completed_tids){ - auto it = threads.find(thread_id); - it->second.first.join(); - auto int_retval = it->second.second.get(); - if (int_retval && !(int_retval == -ENOENT && isMultiHead)) { - S3FS_PRN_WARN("thread terminated with non-zero return code: %d", int_retval); - result = int_retval; - } - threads.erase(it); - } - completed_tids.clear(); - - return result; -} - -int S3fsMultiCurl::MultiRead() -{ - int result = 0; - - for(auto iter = clist_req.begin(); iter != clist_req.end(); ){ - std::unique_ptr s3fscurl(std::move(*iter)); - - bool isRetry = false; - bool isPostpone = false; - bool isNeedResetOffset = true; - long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET; - CURLcode curlCode = s3fscurl->GetCurlCode(); - - if(s3fscurl->GetResponseCode(responseCode, false) && curlCode == CURLE_OK){ - if(S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET == responseCode){ - // This is a case where the processing result has not yet been updated (should be very rare). - isPostpone = true; - }else if(400 > responseCode){ - // add into stat cache - // cppcheck-suppress unmatchedSuppression - // cppcheck-suppress knownPointerToBool - if(SuccessCallback && !SuccessCallback(s3fscurl.get(), pSuccessCallbackParam)){ - S3FS_PRN_WARN("error from success callback function(%s).", s3fscurl->url.c_str()); - } - }else if(400 == responseCode){ - // as possibly in multipart - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - }else if(404 == responseCode){ - // not found - // HEAD requests on readdir_multi_head can return 404 - if(s3fscurl->GetOp() != "HEAD"){ - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - } - // Call callback function - // cppcheck-suppress unmatchedSuppression - // cppcheck-suppress knownPointerToBool - if(NotFoundCallback && !NotFoundCallback(s3fscurl.get(), pNotFoundCallbackParam)){ - S3FS_PRN_WARN("error from not found callback function(%s).", s3fscurl->url.c_str()); - } - }else if(500 == responseCode){ - // case of all other result, do retry.(11/13/2013) - // because it was found that s3fs got 500 error from S3, but could success - // to retry it. - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - }else{ - // Retry in other case. - S3FS_PRN_WARN("failed a request(%ld: %s)", responseCode, s3fscurl->url.c_str()); - isRetry = true; - } - }else{ - S3FS_PRN_ERR("failed a request(Unknown response code: %s)", s3fscurl->url.c_str()); - // Reuse particular file - switch(curlCode){ - case CURLE_OPERATION_TIMEDOUT: - isRetry = true; - isNeedResetOffset = false; - break; - - case CURLE_PARTIAL_FILE: - isRetry = true; - isNeedResetOffset = false; - break; - - default: - S3FS_PRN_ERR("###curlCode: %d msg: %s", curlCode, curl_easy_strerror(curlCode)); - isRetry = true; - break; - } - } - - if(isPostpone){ - clist_req.erase(iter); - clist_req.push_back(std::move(s3fscurl)); // Re-evaluate at the end - iter = clist_req.begin(); - }else{ - if(!isRetry || (!not_abort && 0 != result)){ - // If an EIO error has already occurred, it will be terminated - // immediately even if retry processing is required. - s3fscurl->DestroyCurlHandle(); - }else{ - // Reset offset - if(isNeedResetOffset){ - S3fsCurl::ResetOffset(s3fscurl.get()); - } - - // For retry - std::unique_ptr retrycurl; - const S3fsCurl* retrycurl_ptr = retrycurl.get(); // save this due to std::move below - if(RetryCallback){ - retrycurl = RetryCallback(s3fscurl.get()); - if(nullptr != retrycurl){ - clist_all.push_back(std::move(retrycurl)); - }else{ - // set EIO and wait for other parts. - result = -EIO; - } - } - // cppcheck-suppress mismatchingContainers - if(s3fscurl.get() != retrycurl_ptr){ - s3fscurl->DestroyCurlHandle(); - } - } - iter = clist_req.erase(iter); - } - } - clist_req.clear(); - - if(!not_abort && 0 != result){ - // If an EIO error has already occurred, clear all retry objects. - for(auto iter = clist_all.cbegin(); iter != clist_all.cend(); ++iter){ - S3fsCurl* s3fscurl = iter->get(); - s3fscurl->DestroyCurlHandle(); - } - clist_all.clear(); - } - return result; -} - -int S3fsMultiCurl::Request() -{ - S3FS_PRN_INFO3("[count=%zu]", clist_all.size()); - - // Make request list. - // - // Send multi request loop( with retry ) - // (When many request is sends, sometimes gets "Couldn't connect to server") - // - while(!clist_all.empty()){ - // set curl handle to multi handle - int result; - for(auto iter = clist_all.begin(); iter != clist_all.end(); ++iter){ - clist_req.push_back(std::move(*iter)); - } - clist_all.clear(); - - // Send multi request. - if(0 != (result = MultiPerform())){ - Clear(); - return result; - } - - // Read the result - if(0 != (result = MultiRead())){ - Clear(); - return result; - } - - // Cleanup curl handle in multi handle - ClearEx(false); - } - return 0; -} - -// -// thread function for performing an S3fsCurl request -// -void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise promise) -{ - int result = 0; - if(!s3fscurl){ - // this doesn't signal completion but also never happens - promise.set_value(-EIO); - return; - } - if(s3fscurl->fpLazySetup){ - if(!s3fscurl->fpLazySetup(s3fscurl)){ - S3FS_PRN_ERR("Failed to lazy setup, then respond EIO."); - result = -EIO; - } - } - - if(!result){ - result = s3fscurl->RequestPerform(); - s3fscurl->DestroyCurlHandle(false); - } - - const std::lock_guard lock(*s3fscurl->completed_tids_lock); - s3fscurl->completed_tids->push_back(std::this_thread::get_id()); - s3fscurl->sem->release(); - - promise.set_value(result); -} - -/* -* Local variables: -* tab-width: 4 -* c-basic-offset: 4 -* End: -* vim600: expandtab sw=4 ts=4 fdm=marker -* vim<600: expandtab sw=4 ts=4 -*/ diff --git a/src/curl_multi.h b/src/curl_multi.h deleted file mode 100644 index 69d4745012..0000000000 --- a/src/curl_multi.h +++ /dev/null @@ -1,99 +0,0 @@ -/* - * s3fs - FUSE-based file system backed by Amazon S3 - * - * Copyright(C) 2007 Randy Rizun - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -#ifndef S3FS_CURL_MULTI_H_ -#define S3FS_CURL_MULTI_H_ - -#include -#include -#include -#include -#include - -#include "common.h" - -//---------------------------------------------- -// Typedef -//---------------------------------------------- -class S3fsCurl; - -typedef std::vector> s3fscurllist_t; -typedef bool (*S3fsMultiSuccessCallback)(S3fsCurl* s3fscurl, void* param); // callback for succeed multi request -typedef bool (*S3fsMultiNotFoundCallback)(S3fsCurl* s3fscurl, void* param); // callback for succeed multi request -typedef std::unique_ptr (*S3fsMultiRetryCallback)(S3fsCurl* s3fscurl); // callback for failure and retrying - -//---------------------------------------------- -// class S3fsMultiCurl -//---------------------------------------------- -class S3fsMultiCurl -{ - private: - const int maxParallelism; - - s3fscurllist_t clist_all; // all of curl requests - s3fscurllist_t clist_req; // curl requests are sent - bool not_abort; // complete all requests without aborting on errors - - S3fsMultiSuccessCallback SuccessCallback; - S3fsMultiNotFoundCallback NotFoundCallback; - S3fsMultiRetryCallback RetryCallback; - void* pSuccessCallbackParam; - void* pNotFoundCallbackParam; - - std::mutex completed_tids_lock; - std::vector completed_tids GUARDED_BY(completed_tids_lock); - - private: - bool ClearEx(bool is_all); - int MultiPerform(); - int MultiRead(); - - static void RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise promise); - - public: - explicit S3fsMultiCurl(int maxParallelism, bool not_abort = false); - ~S3fsMultiCurl(); - S3fsMultiCurl(const S3fsMultiCurl&) = delete; - S3fsMultiCurl(S3fsMultiCurl&&) = delete; - S3fsMultiCurl& operator=(const S3fsMultiCurl&) = delete; - S3fsMultiCurl& operator=(S3fsMultiCurl&&) = delete; - - int GetMaxParallelism() const { return maxParallelism; } - - S3fsMultiSuccessCallback SetSuccessCallback(S3fsMultiSuccessCallback function); - S3fsMultiNotFoundCallback SetNotFoundCallback(S3fsMultiNotFoundCallback function); - S3fsMultiRetryCallback SetRetryCallback(S3fsMultiRetryCallback function); - void* SetSuccessCallbackParam(void* param); - void* SetNotFoundCallbackParam(void* param); - bool Clear() { return ClearEx(true); } - bool SetS3fsCurlObject(std::unique_ptr s3fscurl); - int Request(); -}; - -#endif // S3FS_CURL_MULTI_H_ - -/* -* Local variables: -* tab-width: 4 -* c-basic-offset: 4 -* End: -* vim600: expandtab sw=4 ts=4 fdm=marker -* vim<600: expandtab sw=4 ts=4 -*/ diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index 7fc37c5c53..296c7e67a8 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -1611,8 +1611,7 @@ int FdEntity::RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) }else if(pagelist.Size() >= S3fsCurl::GetMultipartSize()){ // multipart uploading - result = S3fsCurl::ParallelMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd); - + result = multipart_upload_request((tpath ? std::string(tpath) : tmppath), tmporgmeta, physical_fd); }else{ // normal uploading (too small part size) @@ -1751,7 +1750,7 @@ int FdEntity::RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) } // multipart uploading with copy api - result = S3fsCurl::ParallelMixMultipartUploadRequest(tpath ? tpath : tmppath.c_str(), tmporgmeta, physical_fd, mixuppages); + result = mix_multipart_upload_request((tpath ? std::string(tpath) : tmppath), tmporgmeta, physical_fd, mixuppages); }else{ // normal uploading (too small part size) diff --git a/src/s3fs.cpp b/src/s3fs.cpp index ac1f195b54..3cc0a46876 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -43,7 +43,6 @@ #include "fdcache_auto.h" #include "fdcache_stat.h" #include "curl.h" -#include "curl_multi.h" #include "curl_util.h" #include "s3objlist.h" #include "cache.h" diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index 5865d122b7..dd7fa0add2 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -711,6 +711,221 @@ int await_multipart_upload_part_request(const std::string& path, int upload_fd, return 0; } +// +// Complete sequence of Multipart Upload Requests processing +// +// Call the following function: +// pre_multipart_upload_request() +// multipart_upload_part_request() +// abort_multipart_upload_request() +// complete_multipart_upload_request() +// +int multipart_upload_request(const std::string& path, const headers_t& meta, int upload_fd) +{ + S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload_fd=%d]", path.c_str(), upload_fd); + + // Get file stat + struct stat st; + if(-1 == fstat(upload_fd, &st)){ + S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno); + return -errno; + } + + // Get upload id + std::string upload_id; + int result; + if(0 != (result = pre_multipart_upload_request(path, meta, upload_id))){ + return result; + } + + Semaphore upload_sem(0); + std::mutex result_lock; // protects last_result + int last_result = 0; + int req_count = 0; // request count(the part number will be this value +1.) + etaglist_t list; + + // cycle through open upload_fd, pulling off 10MB chunks at a time + for(off_t remaining_bytes = st.st_size; 0 < remaining_bytes; ++req_count){ + // add new etagpair to etaglist_t list + list.emplace_back(nullptr, (req_count + 1)); + etagpair* petag = &list.back(); + + off_t start = st.st_size - remaining_bytes; + off_t chunk = std::min(remaining_bytes, S3fsCurl::GetMultipartSize()); + + S3FS_PRN_INFO3("Multipart Upload Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast(start), static_cast(chunk), (req_count + 1)); + + // setup instruction and request on another thread + if(0 != (result = multipart_upload_part_request(path, upload_fd, start, chunk, (req_count + 1), upload_id, petag, false, &upload_sem, &result_lock, &last_result))){ + S3FS_PRN_ERR("failed setup instruction for Multipart Upload Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast(start), static_cast(chunk), (req_count + 1)); + + // [NOTE] + // Hold onto result until all request finish. + break; + } + remaining_bytes -= chunk; + } + + // wait for finish all requests + while(req_count > 0){ + upload_sem.acquire(); + --req_count; + } + + // check result + if(0 != result || 0 != last_result){ + S3FS_PRN_ERR("Error occurred in Multipart Upload Request (errno=%d).", (0 != result ? result : last_result)); + + int result2; + if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){ + S3FS_PRN_ERR("Error aborting Multipart Upload Request (errno=%d).", result2); + } + return (0 != result ? result : last_result); + } + + // complete requests + if(0 != (result = complete_multipart_upload_request(path, upload_id, list))){ + S3FS_PRN_ERR("Error occurred in Completion for Multipart Upload Request (errno=%d).", result); + return result; + } + return 0; +} + +// +// Complete sequence of Mix Multipart Upload Requests processing +// +// Call the following function: +// pre_multipart_upload_request() +// multipart_upload_part_request() +// abort_multipart_upload_request() +// complete_multipart_upload_request() +// +int mix_multipart_upload_request(const std::string& path, headers_t& meta, int upload_fd, const fdpage_list_t& mixuppages) +{ + S3FS_PRN_INFO3("Mix Multipart Upload Request [path=%s][upload_fd=%d]", path.c_str(), upload_fd); + + // Get file stat + struct stat st; + if(-1 == fstat(upload_fd, &st)){ + S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno); + return -errno; + } + + // Get upload id + std::string upload_id; + int result; + if(0 != (result = pre_multipart_upload_request(path, meta, upload_id))){ + return result; + } + + // Prepare headers for Multipart Upload Copy + std::string srcresource; + std::string srcurl; + MakeUrlResource(get_realpath(path.c_str()).c_str(), srcresource, srcurl); + meta["Content-Type"] = S3fsCurl::LookupMimeType(path); + meta["x-amz-copy-source"] = srcresource; + + Semaphore upload_sem(0); + std::mutex result_lock; // protects last_result + int last_result = 0; + int req_count = 0; // request count(the part number will be this value +1.) + etaglist_t list; + + for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){ + if(iter->modified){ + // + // Multipart Upload Content + // + + // add new etagpair to etaglist_t list + list.emplace_back(nullptr, (req_count + 1)); + etagpair* petag = &list.back(); + + S3FS_PRN_INFO3("Mix Multipart Upload Content Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast(iter->offset), static_cast(iter->bytes), (req_count + 1)); + + // setup instruction and request on another thread + if(0 != (result = multipart_upload_part_request(path, upload_fd, iter->offset, iter->bytes, (req_count + 1), upload_id, petag, false, &upload_sem, &result_lock, &last_result))){ + S3FS_PRN_ERR("Failed setup instruction for Mix Multipart Upload Content Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast(iter->offset), static_cast(iter->bytes), (req_count + 1)); + // [NOTE] + // Hold onto result until all request finish. + break; + } + ++req_count; + + }else{ + // + // Multipart Upload Copy + // + // [NOTE] + // Each part must be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB, then loop. + // This loop breaks if result is not 0. + // + for(off_t processed_bytes = 0, request_bytes = 0; processed_bytes < iter->bytes && 0 == result; processed_bytes += request_bytes){ + // Set temporary part sizes + request_bytes = std::min(S3fsCurl::GetMultipartCopySize(), (iter->bytes - processed_bytes)); + + // Check lastest part size + off_t remain_bytes = iter->bytes - processed_bytes - request_bytes; + if((0 < remain_bytes) && (remain_bytes < MIN_MULTIPART_SIZE)){ + if(FIVE_GB < (request_bytes + remain_bytes)){ + request_bytes = (request_bytes + remain_bytes) / 2; + } else{ + request_bytes += remain_bytes; + } + } + + // Set headers for Multipart Upload Copy + std::ostringstream strrange; + strrange << "bytes=" << (iter->offset + processed_bytes) << "-" << (iter->offset + processed_bytes + request_bytes - 1); + meta["x-amz-copy-source-range"] = strrange.str(); + + // add new etagpair to etaglist_t list + list.emplace_back(nullptr, (req_count + 1)); + etagpair* petag = &list.back(); + + S3FS_PRN_INFO3("Mix Multipart Upload Copy Part [path=%s][start=%lld][size=%lld][part_num=%d]", path.c_str(), static_cast(iter->offset + processed_bytes), static_cast(request_bytes), (req_count + 1)); + + // setup instruction and request on another thread + if(0 != (result = multipart_upload_part_request(path, upload_fd, (iter->offset + processed_bytes), request_bytes, (req_count + 1), upload_id, petag, true, &upload_sem, &result_lock, &last_result))){ + S3FS_PRN_ERR("Failed setup instruction for Mix Multipart Upload Copy Part Request by error(%d) [path=%s][start=%lld][size=%lld][part_num=%d]", result, path.c_str(), static_cast(iter->offset + processed_bytes), static_cast(request_bytes), (req_count + 1)); + // [NOTE] + // This loop breaks because result is not 0. + } + ++req_count; + } + if(0 != result){ + // [NOTE] + // Hold onto result until all request finish. + break; + } + } + } + + // wait for finish all requests + while(req_count > 0){ + upload_sem.acquire(); + --req_count; + } + + // check result + if(0 != result || 0 != last_result){ + S3FS_PRN_ERR("Error occurred in Mix Multipart Upload Request (errno=%d).", (0 != result ? result : last_result)); + + int result2; + if(0 != (result2 = abort_multipart_upload_request(path, upload_id))){ + S3FS_PRN_ERR("Error aborting Mix Multipart Upload Request (errno=%d).", result2); + } + return (0 != result ? result : last_result); + } + + // complete requests + if(0 != (result = complete_multipart_upload_request(path, upload_id, list))){ + S3FS_PRN_ERR("Error occurred in Completion for Mix Multipart Upload Request (errno=%d).", result); + return result; + } + return 0; +} + // // Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker // diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index f915f164d4..91d6b9e71a 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -28,6 +28,7 @@ #include "curl.h" #include "s3objlist.h" #include "syncfiller.h" +#include "psemaphore.h" //------------------------------------------------------------------- // Structures for MultiThread Request @@ -217,6 +218,8 @@ void* parallel_get_object_req_threadworker(void* arg); int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id); int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result); int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy); +int multipart_upload_request(const std::string& path, const headers_t& meta, int upload_fd); +int mix_multipart_upload_request(const std::string& path, headers_t& meta, int upload_fd, const fdpage_list_t& mixuppages); int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts); int abort_multipart_upload_request(const std::string& path, const std::string& upload_id); int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);