From 499577c2a99b59e544e89b7ea716ba242bb1ea55 Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Mon, 15 Jul 2024 06:40:05 +0000 Subject: [PATCH] Refactored for standardizing content and copy handling for Multipart Upload --- src/curl.cpp | 226 +++++++++++++++++----------------------- src/curl.h | 15 ++- src/fdcache_entity.cpp | 90 ++-------------- src/fdcache_entity.h | 1 - src/fdcache_fdinfo.cpp | 161 ++++------------------------ src/fdcache_fdinfo.h | 6 +- src/s3fs_threadreqs.cpp | 112 ++++++++++++++++++++ src/s3fs_threadreqs.h | 20 ++++ 8 files changed, 270 insertions(+), 361 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index c3c3b1f84c..c31e55f7cb 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -1227,7 +1227,6 @@ bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param) if(!s3fscurl || param){ // this callback does not need a parameter return false; } - return s3fscurl->MultipartUploadPartComplete(); } @@ -1239,8 +1238,7 @@ bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param) if(!s3fscurl || param){ // this callback does not need a parameter return false; } - - return s3fscurl->MixMultipartUploadComplete(); + return s3fscurl->MultipartUploadPartComplete(); } std::unique_ptr S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl) @@ -1283,7 +1281,7 @@ std::unique_ptr S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s newcurl->type = s3fscurl->type; // setup new curl object - if(0 != newcurl->MultipartUploadPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){ + if(0 != newcurl->MultipartUploadContentPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){ S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); return nullptr; } @@ -1327,7 +1325,7 @@ std::unique_ptr S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s newcurl->type = s3fscurl->type; // setup new curl object - if(0 != newcurl->CopyMultipartUploadSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){ + if(0 != newcurl->MultipartUploadCopyPartSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){ S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); return nullptr; } @@ -1374,69 +1372,6 @@ int S3fsCurl::MapPutErrorResponse(int result) return result; } -// [NOTE] -// It is a factory method as utility because it requires an S3fsCurl object -// initialized for multipart upload from outside this class. -// -std::unique_ptr S3fsCurl::CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result) -{ - // duplicate fd - if(!tpath || -1 == fd || start < 0 || size <= 0 || !petag){ - S3FS_PRN_ERR("Parameters are wrong: tpath(%s), fd(%d), start(%lld), size(%lld), petag(%s)", SAFESTRPTR(tpath), fd, static_cast(start), static_cast(size), (petag ? "not null" : "null")); - result = -EIO; - return nullptr; - } - result = 0; - - std::unique_ptr s3fscurl(new S3fsCurl(true)); - - if(!is_copy){ - s3fscurl->partdata.fd = fd; - s3fscurl->partdata.startpos = start; - s3fscurl->partdata.size = size; - s3fscurl->partdata.is_copy = is_copy; - s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly - s3fscurl->b_partdata_startpos = s3fscurl->partdata.startpos; - s3fscurl->b_partdata_size = s3fscurl->partdata.size; - - S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); - - if(0 != (result = s3fscurl->MultipartUploadPartSetup(tpath, part_num, upload_id))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return nullptr; - } - }else{ - headers_t meta; - std::string srcresource; - std::string srcurl; - MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl); - meta["x-amz-copy-source"] = srcresource; - - std::ostringstream strrange; - strrange << "bytes=" << start << "-" << (start + size - 1); - meta["x-amz-copy-source-range"] = strrange.str(); - - s3fscurl->b_from = SAFESTRPTR(tpath); - s3fscurl->b_meta = meta; - s3fscurl->partdata.petag = petag; // [NOTE] be careful, the value is set directly - - S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); - - if(0 != (result = s3fscurl->CopyMultipartUploadSetup(tpath, tpath, part_num, upload_id, meta))){ - S3FS_PRN_ERR("failed uploading part setup(%d)", result); - return nullptr; - } - } - - // Call lazy function - if(!s3fscurl->fpLazySetup || !s3fscurl->fpLazySetup(s3fscurl.get())){ - S3FS_PRN_ERR("failed lazy function setup for uploading part"); - result = -EIO; - return nullptr; - } - return s3fscurl; -} - int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd) { int result; @@ -1475,7 +1410,7 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& s3fscurl_para->partdata.add_etag_list(list); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ + if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -1548,7 +1483,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset), static_cast(iter->bytes), s3fscurl_para->partdata.get_part_number()); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->MultipartUploadPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ + if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -1586,7 +1521,7 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(iter->offset + i), static_cast(bytes), s3fscurl_para->partdata.get_part_number()); // initiate upload part for parallel - if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ + if(0 != (result = s3fscurl_para->MultipartUploadCopyPartSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){ S3FS_PRN_ERR("failed uploading part setup(%d)", result); return result; } @@ -3877,6 +3812,63 @@ int S3fsCurl::PreMultipartUploadRequest(const char* tpath, const headers_t& meta return 0; } +int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy) +{ + // duplicate upload_fd + if(!tpath || start < 0 || size <= 0 || !petag || (!is_copy && -1 == upload_fd)){ + S3FS_PRN_ERR("Parameters are wrong: path(%s), upload_fd(%d), start(%lld), size(%lld), petag(%s), is_copy(%s)", SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), (petag ? "not null" : "null"), (is_copy ? "true" : "false")); + return -EIO; + } + + int result = 0; + + if(!is_copy){ + partdata.fd = upload_fd; + partdata.startpos = start; + partdata.size = size; + partdata.is_copy = is_copy; + partdata.set_etag(petag); // [NOTE] be careful, the value is set directly + b_partdata_startpos = start; + b_partdata_size = size; + + S3FS_PRN_INFO3("Upload Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); + + if(0 != (result = MultipartUploadContentPartSetup(tpath, part_num, upload_id))){ + S3FS_PRN_ERR("failed uploading part setup(%d)", result); + return result; + } + }else{ + headers_t meta; + std::string srcresource; + std::string srcurl; + + MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl); + meta["x-amz-copy-source"] = srcresource; + + std::ostringstream strrange; + strrange << "bytes=" << start << "-" << (start + size - 1); + meta["x-amz-copy-source-range"] = strrange.str(); + + b_from = SAFESTRPTR(tpath); + b_meta = meta; + partdata.set_etag(petag); // [NOTE] be careful, the value is set directly + + S3FS_PRN_INFO3("Copy Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(start), static_cast(size), part_num); + + if(0 != (result = MultipartUploadCopyPartSetup(tpath, tpath, part_num, upload_id, meta))){ + S3FS_PRN_ERR("failed uploading copy part setup(%d)", result); + return result; + } + } + + // Call lazy function + if(!fpLazySetup || !fpLazySetup(this)){ + S3FS_PRN_ERR("failed lazy function setup for uploading part"); + return -EIO; + } + return 0; +} + int S3fsCurl::MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts) { S3FS_PRN_INFO3("[tpath=%s][parts=%zu]", SAFESTRPTR(tpath), parts.size()); @@ -4085,7 +4077,7 @@ int S3fsCurl::AbortMultipartUpload(const char* tpath, const std::string& upload_ // Content-MD5: pUNXr/BjKK5G2UKvaRRrOA== // Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc= // -int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id) +int S3fsCurl::MultipartUploadContentPartSetup(const char* tpath, int part_num, const std::string& upload_id) { S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(partdata.startpos), static_cast(partdata.size), part_num); @@ -4145,37 +4137,7 @@ int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int part_num, const st return 0; } -int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id) -{ - int result; - - S3FS_PRN_INFO3("[tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast(partdata.startpos), static_cast(partdata.size), part_num); - - // setup - if(0 != (result = S3fsCurl::MultipartUploadPartSetup(tpath, part_num, upload_id))){ - return result; - } - - if(!fpLazySetup || !fpLazySetup(this)){ - S3FS_PRN_ERR("Failed to lazy setup in multipart upload part request."); - return -EIO; - } - - // request - if(0 == (result = RequestPerform())){ - if(!MultipartUploadPartComplete()){ - result = -EIO; - } - } - - // closing - bodydata.clear(); - headdata.clear(); - - return result; -} - -int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta) +int S3fsCurl::MultipartUploadCopyPartSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta) { S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num); @@ -4241,7 +4203,7 @@ int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int par return 0; } -bool S3fsCurl::MultipartUploadPartComplete() +bool S3fsCurl::MultipartUploadContentPartComplete() { auto it = responseHeaders.find("ETag"); if (it == responseHeaders.cend()) { @@ -4262,31 +4224,32 @@ bool S3fsCurl::MultipartUploadPartComplete() } } partdata.petag->etag = etag; - partdata.uploaded = true; + partdata.uploaded = true; return true; } -bool S3fsCurl::CopyMultipartUploadComplete() +bool S3fsCurl::MultipartUploadCopyPartComplete() { std::string etag; partdata.uploaded = simple_parse_xml(bodydata.c_str(), bodydata.size(), "ETag", etag); partdata.petag->etag = peeloff(std::move(etag)); - bodydata.clear(); - headdata.clear(); - return true; } -bool S3fsCurl::MixMultipartUploadComplete() +bool S3fsCurl::MultipartUploadPartComplete() { bool result; if(-1 == partdata.fd){ - result = CopyMultipartUploadComplete(); + result = MultipartUploadCopyPartComplete(); }else{ - result = MultipartUploadPartComplete(); + result = MultipartUploadContentPartComplete(); } + + bodydata.clear(); + headdata.clear(); + return result; } @@ -4297,7 +4260,7 @@ int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string int result; // setup - if(0 != (result = CopyMultipartUploadSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){ + if(0 != (result = MultipartUploadCopyPartSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){ S3FS_PRN_ERR("failed multipart put head request setup(from=%s, to=%s, part_number=%d, upload_id=%s) : %d", from.c_str(), to.c_str(), part_number, upload_id.c_str(), result); return result; } @@ -4314,27 +4277,34 @@ int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string return 0; } -int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair) +int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy) { - S3FS_PRN_INFO3("[upload_id=%s][tpath=%s][fd=%d][offset=%lld][size=%lld]", upload_id.c_str(), SAFESTRPTR(tpath), fd, static_cast(offset), static_cast(size)); - - // set - partdata.fd = fd; - partdata.startpos = offset; - partdata.size = size; - b_partdata_startpos = partdata.startpos; - b_partdata_size = partdata.size; - partdata.set_etag(petagpair); + S3FS_PRN_INFO3("Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false")); - // upload part + // + // Setup + // int result; - if(0 != (result = MultipartUploadPartRequest(tpath, petagpair->part_num, upload_id))){ - S3FS_PRN_ERR("failed uploading %d part by error(%d)", petagpair->part_num, result); + if(0 != (result = MultipartUploadPartSetup(tpath, upload_fd, start, size, part_num, upload_id, petag, is_copy))){ + S3FS_PRN_ERR("Failed pre-setup for Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false")); return result; } - DestroyCurlHandle(); - return 0; + // + // Send request + // + if(0 == (result = RequestPerform())){ + S3FS_PRN_DBG("Succeed Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false")); + + if(!MultipartUploadPartComplete()){ + S3FS_PRN_ERR("Failed completion for Multipart Upload Part [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false")); + result = -EIO; + } + }else{ + S3FS_PRN_ERR("Failed Multipart Upload Part with error(%d) [tpath=%s][upload_fd=%d][start=%lld][size=%lld][part_num=%d][upload_id=%s][is_copy=%s]", result, SAFESTRPTR(tpath), upload_fd, static_cast(start), static_cast(size), part_num, upload_id.c_str(), (is_copy ? "true" : "false")); + } + + return result; } /* diff --git a/src/curl.h b/src/curl.h index 8e830be3fb..0e6e87bc46 100644 --- a/src/curl.h +++ b/src/curl.h @@ -277,10 +277,10 @@ class S3fsCurl } std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token); std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token); - int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id); - int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta); - bool MultipartUploadPartComplete(); - bool CopyMultipartUploadComplete(); + int MultipartUploadContentPartSetup(const char* tpath, int part_num, const std::string& upload_id); + int MultipartUploadCopyPartSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta); + bool MultipartUploadContentPartComplete(); + bool MultipartUploadCopyPartComplete(); int MapPutErrorResponse(int result); public: @@ -289,7 +289,6 @@ class S3fsCurl static bool InitCredentialObject(S3fsCred* pcredobj); static bool InitMimeType(const std::string& strFile); static bool DestroyS3fsCurl(); - static std::unique_ptr CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result); static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd); static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages); @@ -377,13 +376,13 @@ class S3fsCurl int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse); int ListBucketRequest(const char* tpath, const char* query); int PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id); + int MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy); int MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts); - int MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id); - bool MixMultipartUploadComplete(); + bool MultipartUploadPartComplete(); int MultipartListRequest(std::string& body); int AbortMultipartUpload(const char* tpath, const std::string& upload_id); int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta); - int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair); + int MultipartUploadPartRequest(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy); // methods(variables) const std::string& GetPath() const { return path; } diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index a5b43d6fd0..abde97d7c2 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -51,25 +51,6 @@ //------------------------------------------------ static constexpr int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max count -//------------------------------------------------ -// Structure of parameters to pass to thread -//------------------------------------------------ -// -// Multipart Upload Request parameter structure for Thread Pool. -// -// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.) -// -struct multipart_upload_req_thparam -{ - std::string path; - std::string upload_id; - int fd = -1; - off_t start = 0; - off_t size = 0; - etagpair* petagpair = nullptr; - int result = 0; -}; - //------------------------------------------------ // FdEntity class variables //------------------------------------------------ @@ -129,25 +110,6 @@ ino_t FdEntity::GetInode(int fd) return st.st_ino; } -// -// Worker function for multipart upload request -// -// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.) -// -void* FdEntity::MultipartUploadThreadWorker(void* arg) -{ - auto* pthparam = static_cast(arg); - if(!pthparam){ - return reinterpret_cast(-EIO); - } - S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload id=%s][fd=%d][start=%lld][size=%lld][etagpair=%p]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->fd, static_cast(pthparam->start), static_cast(pthparam->size), pthparam->petagpair); - - S3fsCurl s3fscurl(true); - pthparam->result = s3fscurl.MultipartUploadRequest(pthparam->upload_id, pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->petagpair); - - return reinterpret_cast(pthparam->result); -} - //------------------------------------------------ // FdEntity methods //------------------------------------------------ @@ -1262,9 +1224,6 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si return result; } -// -// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request -// // [NOTE] // If the request is successful, initialize upload_id. // @@ -1275,19 +1234,11 @@ int FdEntity::PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) return -EIO; } - // get upload_id - std::string upload_id; - int result; - if(0 != (result = pre_multipart_upload_request(path, orgmeta, upload_id))){ + int result; + if(0 != (result = pseudo_obj->PreMultipartUploadRequest(path, orgmeta))){ return result; } - // reset upload_id - if(!pseudo_obj->InitialUploadInfo(upload_id)){ - S3FS_PRN_ERR("failed to initialize upload id(%s)", upload_id.c_str()); - return -EIO; - } - // Clear the dirty flag, because the meta data is updated. pending_status = pending_status_t::NO_UPDATE_PENDING; @@ -1320,8 +1271,6 @@ int FdEntity::NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj) // At no disk space for caching object. // This method is uploading one part of multipart. // -// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.) -// int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size) { if(-1 == tgfd || !pseudo_obj || !pseudo_obj->IsUploading()){ @@ -1329,40 +1278,23 @@ int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, return -EIO; } - // parameter for thread worker - multipart_upload_req_thparam thargs; - thargs.path = path; - thargs.upload_id.clear(); - thargs.fd = tgfd; - thargs.start = start; - thargs.size = size; - thargs.petagpair = nullptr; - thargs.result = 0; - // get upload id - if(!pseudo_obj->GetUploadId(thargs.upload_id)){ + std::string upload_id; + if(!pseudo_obj->GetUploadId(upload_id)){ return -EIO; } // append new part and get it's etag string pointer - if(!pseudo_obj->AppendUploadPart(start, size, false, &(thargs.petagpair))){ + etagpair* petag = nullptr; + if(!pseudo_obj->AppendUploadPart(start, size, false, &petag)){ return -EIO; } - // make parameter for thread pool - thpoolman_param ppoolparam; - ppoolparam.args = &thargs; - ppoolparam.psem = nullptr; // case await - ppoolparam.pfunc = FdEntity::MultipartUploadThreadWorker; - - // send request by thread - if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ - S3FS_PRN_ERR("failed to setup Get Object Request Thread Worker"); - return -EIO; - } - if(0 != thargs.result){ - S3FS_PRN_ERR("Multipart Upload Request(path=%s, upload_id=%s, fd=%d, start=%lld, size=%lld) returns with error(%d)", path.c_str(), thargs.upload_id.c_str(), tgfd, static_cast(start), static_cast(size), thargs.result); - return thargs.result; + // request to thread + int result; + if(0 != (result = await_multipart_upload_part_request(path, tgfd, start, size, petag->part_num, upload_id, petag, false))){ + S3FS_PRN_ERR("Failed No Cache Multipart Upload Part Request by error(%d) [path=%s][upload_id=%s][fd=%d][start=%lld][size=%lld]", result, path.c_str(), upload_id.c_str(), tgfd, static_cast(start), static_cast(size)); + return result; } return 0; } diff --git a/src/fdcache_entity.h b/src/fdcache_entity.h index f761e5ba2b..bb00e5bb0e 100644 --- a/src/fdcache_entity.h +++ b/src/fdcache_entity.h @@ -80,7 +80,6 @@ class FdEntity : public std::enable_shared_from_this private: static int FillFile(int fd, unsigned char byte, off_t size, off_t start); static ino_t GetInode(int fd); - static void* MultipartUploadThreadWorker(void* arg); // ([TODO] This is a temporary method is moved when S3fsMultiCurl is deprecated.) void Clear(); ino_t GetInode() const REQUIRES(FdEntity::fdent_data_lock); diff --git a/src/fdcache_fdinfo.cpp b/src/fdcache_fdinfo.cpp index 5ca83518a8..209bb121bc 100644 --- a/src/fdcache_fdinfo.cpp +++ b/src/fdcache_fdinfo.cpp @@ -40,92 +40,10 @@ #include "threadpoolman.h" #include "s3fs_threadreqs.h" -//------------------------------------------------ -// Structure of parameters to pass to thread -//------------------------------------------------ -// [NOTE] -// The processing related to this is currently temporarily implemented -// in this file, but will be moved to a separate file at a later. -// -struct pseudofdinfo_mpupload_thparam -{ - PseudoFdInfo* ppseudofdinfo = nullptr; - std::string path; - std::string upload_id; - int upload_fd = -1; - off_t start = 0; - off_t size = 0; - bool is_copy = false; - int part_num = -1; - etagpair* petag = nullptr; -}; - -//------------------------------------------------ -// PseudoFdInfo class methods -//------------------------------------------------ -// -// Thread Worker function for uploading -// -void* PseudoFdInfo::MultipartUploadThreadWorker(void* arg) -{ - std::unique_ptr pthparam(static_cast(arg)); - if(!pthparam || !(pthparam->ppseudofdinfo)){ - return reinterpret_cast(-EIO); - } - S3FS_PRN_INFO3("Upload Part Thread [tpath=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - - int result; - { - const std::lock_guard lock(pthparam->ppseudofdinfo->upload_list_lock); - - if(0 != (result = pthparam->ppseudofdinfo->last_result)){ - S3FS_PRN_DBG("Already occurred error, thus this thread worker is exiting."); - - if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){ // result will be overwritten with the same value. - result = -EIO; - } - return reinterpret_cast(result); - } - } - - // setup and make curl object - std::unique_ptr s3fscurl(S3fsCurl::CreateParallelS3fsCurl(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->is_copy, pthparam->petag, pthparam->upload_id, result)); - if(nullptr == s3fscurl){ - S3FS_PRN_ERR("failed creating s3fs curl object for uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - - // set result for exiting - const std::lock_guard lock(pthparam->ppseudofdinfo->upload_list_lock); - if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){ - result = -EIO; - } - return reinterpret_cast(result); - } - - // Send request and get result - if(0 == (result = s3fscurl->RequestPerform())){ - S3FS_PRN_DBG("succeed uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - if(!s3fscurl->MixMultipartUploadComplete()){ - S3FS_PRN_ERR("failed completion uploading [path=%s][start=%lld][size=%lld][part=%d]", pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - result = -EIO; - } - }else{ - S3FS_PRN_ERR("failed uploading with error(%d) [path=%s][start=%lld][size=%lld][part=%d]", result, pthparam->path.c_str(), static_cast(pthparam->start), static_cast(pthparam->size), pthparam->part_num); - } - s3fscurl->DestroyCurlHandle(false); - - // set result - const std::lock_guard lock(pthparam->ppseudofdinfo->upload_list_lock); - if(!pthparam->ppseudofdinfo->CompleteInstruction(result)){ - S3FS_PRN_WARN("This thread worker is about to end, so it doesn't return an EIO here and runs to the end."); - } - - return reinterpret_cast(result); -} - //------------------------------------------------ // PseudoFdInfo methods //------------------------------------------------ -PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), instruct_count(0), completed_count(0), last_result(0), uploaded_sem(0) +PseudoFdInfo::PseudoFdInfo(int fd, int open_flags) : pseudo_fd(-1), physical_fd(fd), flags(0), upload_fd(-1), instruct_count(0), last_result(0), uploaded_sem(0) { if(-1 != physical_fd){ pseudo_fd = PseudoFdManager::Get(); @@ -272,7 +190,6 @@ bool PseudoFdInfo::ResetUploadInfo() upload_id.clear(); upload_list.clear(); instruct_count = 0; - completed_count = 0; last_result = 0; return true; @@ -306,22 +223,6 @@ void PseudoFdInfo::IncreaseInstructionCount() ++instruct_count; } -bool PseudoFdInfo::CompleteInstruction(int result) -{ - if(0 != result){ - last_result = result; - } - - if(0 >= instruct_count){ - S3FS_PRN_ERR("Internal error: instruct_count caused an underflow."); - return false; - } - --instruct_count; - ++completed_count; - - return true; -} - bool PseudoFdInfo::GetUploadInfo(std::string& id, int& fd) const { const std::lock_guard lock(upload_list_lock); @@ -438,10 +339,6 @@ bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool return true; } -// [NOTE] -// This method only launches the upload thread. -// Check the maximum number of threads before calling. -// bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy) { //S3FS_PRN_DBG("[path=%s][mplist(%zu)]", SAFESTRPTR(path), mplist.size()); @@ -461,39 +358,24 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_ return false; } + std::string strpath = SAFESTRPTR(path); + for(auto iter = mplist.cbegin(); iter != mplist.cend(); ++iter){ // Insert upload part etagpair* petag = nullptr; if(!InsertUploadPart(iter->start, iter->size, iter->part_num, is_copy, &petag)){ - S3FS_PRN_ERR("Failed to insert insert upload part(path=%s, start=%lld, size=%lld, part=%d, copy=%s) to mplist", SAFESTRPTR(path), static_cast(iter->start), static_cast(iter->size), iter->part_num, (is_copy ? "true" : "false")); + S3FS_PRN_ERR("Failed to insert Multipart Upload Part to mplist [path=%s][start=%lld][size=%lld][part_num=%d][is_copy=%s]", strpath.c_str(), static_cast(iter->start), static_cast(iter->size), iter->part_num, (is_copy ? "true" : "false")); return false; } - // make parameter for my thread - auto* thargs = new pseudofdinfo_mpupload_thparam; - thargs->ppseudofdinfo = this; - thargs->path = SAFESTRPTR(path); - thargs->upload_id = tmp_upload_id; - thargs->upload_fd = tmp_upload_fd; - thargs->start = iter->start; - thargs->size = iter->size; - thargs->is_copy = is_copy; - thargs->part_num = iter->part_num; - thargs->petag = petag; - - // make parameter for thread pool - thpoolman_param ppoolparam; - ppoolparam.args = thargs; - ppoolparam.psem = &uploaded_sem; - ppoolparam.pfunc = PseudoFdInfo::MultipartUploadThreadWorker; - - // setup instruction - if(!ThreadPoolMan::Instruct(ppoolparam)){ - S3FS_PRN_ERR("failed setup instruction for uploading."); - delete thargs; + // setup instruction and request on another thread + int result; + if(0 != (result = multipart_upload_part_request(strpath, tmp_upload_fd, iter->start, iter->size, iter->part_num, tmp_upload_id, petag, is_copy, &uploaded_sem, &upload_list_lock, &last_result))){ + S3FS_PRN_ERR("failed setup instruction for Multipart Upload Part Request by erro(%d) [path=%s][start=%lld][size=%lld][part_num=%d][is_copy=%s]", result, strpath.c_str(), static_cast(iter->start), static_cast(iter->size), iter->part_num, (is_copy ? "true" : "false")); return false; } + // Count up the number of internally managed threads IncreaseInstructionCount(); } return true; @@ -525,22 +407,23 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li // [NOTE] // If the request is successful, initialize upload_id. // -bool PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta) +int PseudoFdInfo::PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta) { // get upload_id std::string new_upload_id; - if(0 != pre_multipart_upload_request(strpath, meta, new_upload_id)){ - return false; + int result; + if(0 != (result = pre_multipart_upload_request(strpath, meta, new_upload_id))){ + return result; } // reset upload_id if(!RowInitialUploadInfo(new_upload_id, false/* not need to cancel */)){ S3FS_PRN_ERR("failed to setup multipart upload(set upload id to object)"); - return false; + return -EIO; } S3FS_PRN_DBG("succeed to setup multipart upload(set upload id to object)"); - return true; + return 0; } // @@ -626,8 +509,9 @@ ssize_t PseudoFdInfo::UploadBoundaryLastUntreatedArea(const char* path, headers_ // if(!IsUploading()){ std::string strpath = SAFESTRPTR(path); - if(!PreMultipartUploadRequest(strpath, meta)){ - return -EIO; + int result; + if(0 != (result = PreMultipartUploadRequest(strpath, meta))){ + return result; } } @@ -669,7 +553,7 @@ int PseudoFdInfo::WaitAllThreadsExit() bool is_loop = true; { const std::lock_guard lock(upload_list_lock); - if(0 == instruct_count && 0 == completed_count){ + if(0 == instruct_count){ result = last_result; is_loop = false; } @@ -680,10 +564,7 @@ int PseudoFdInfo::WaitAllThreadsExit() uploaded_sem.acquire(); { const std::lock_guard lock(upload_list_lock); - if(0 < completed_count){ - --completed_count; - } - if(0 == instruct_count && 0 == completed_count){ + if(0 == --instruct_count){ // break loop result = last_result; is_loop = false; @@ -699,7 +580,7 @@ bool PseudoFdInfo::CancelAllThreads() bool need_cancel = false; { const std::lock_guard lock(upload_list_lock); - if(0 < instruct_count && 0 < completed_count){ + if(0 < instruct_count){ S3FS_PRN_INFO("The upload thread is running, so cancel them and wait for the end."); need_cancel = true; last_result = -ECANCELED; // to stop thread running diff --git a/src/fdcache_fdinfo.h b/src/fdcache_fdinfo.h index fe04d1d0fa..6668438242 100644 --- a/src/fdcache_fdinfo.h +++ b/src/fdcache_fdinfo.h @@ -48,24 +48,19 @@ class PseudoFdInfo filepart_list_t upload_list GUARDED_BY(upload_list_lock); petagpool etag_entities GUARDED_BY(upload_list_lock); // list of etag string and part number entities(to maintain the etag entity even if MPPART_INFO is destroyed) int instruct_count GUARDED_BY(upload_list_lock); // number of instructions for processing by threads - int completed_count GUARDED_BY(upload_list_lock); // number of completed processes by thread int last_result GUARDED_BY(upload_list_lock); // the result of thread processing Semaphore uploaded_sem; // use a semaphore to trigger an upload completion like event flag private: - static void* MultipartUploadThreadWorker(void* arg); - bool Clear(); void CloseUploadFd(); bool OpenUploadFd(); bool ResetUploadInfo() REQUIRES(upload_list_lock); bool RowInitialUploadInfo(const std::string& id, bool is_cancel_mp); void IncreaseInstructionCount(); - bool CompleteInstruction(int result) REQUIRES(upload_list_lock); bool GetUploadInfo(std::string& id, int& fd) const; bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy); bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag); - bool PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta); bool CancelAllThreads(); bool ExtractUploadPartsFromUntreatedArea(off_t untreated_start, off_t untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size); bool IsUploadingHasLock() const REQUIRES(upload_list_lock); @@ -95,6 +90,7 @@ class PseudoFdInfo bool AppendUploadPart(off_t start, off_t size, bool is_copy = false, etagpair** ppetag = nullptr); bool ParallelMultipartUploadAll(const char* path, const mp_part_list_t& to_upload_list, const mp_part_list_t& copy_list, int& result); + int PreMultipartUploadRequest(const std::string& strpath, const headers_t& meta); int WaitAllThreadsExit(); ssize_t UploadBoundaryLastUntreatedArea(const char* path, headers_t& meta, FdEntity* pfdent) REQUIRES(pfdent->GetMutex()); diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index cffbb5d880..b8d3a8e772 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -299,6 +299,46 @@ void* pre_multipart_upload_req_threadworker(void* arg) return reinterpret_cast(pthparam->result); } +// +// Worker function for pre multipart upload part request +// +void* multipart_upload_part_req_threadworker(void* arg) +{ + auto* pthparam = static_cast(arg); + if(!pthparam || !pthparam->pthparam_lock || !pthparam->petag || !pthparam->presult){ + return reinterpret_cast(-EIO); + } + S3FS_PRN_INFO3("Multipart Upload Part Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->upload_fd, static_cast(pthparam->start), static_cast(pthparam->size), (pthparam->is_copy ? "true" : "false"), pthparam->part_num); + + // + // Check last thread result + // + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + if(0 != *(pthparam->presult)){ + S3FS_PRN_DBG("Already occurred error(%d), thus this thread worker is exiting.", *(pthparam->presult)); + return reinterpret_cast(*(pthparam->presult)); + } + } + + // + // Request + // + S3fsCurl s3fscurl(true); + int result; + if(0 != (result = s3fscurl.MultipartUploadPartRequest(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->upload_id, pthparam->petag, pthparam->is_copy))){ + S3FS_PRN_ERR("Failed Multipart Upload Part Worker with error(%d) [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", result, pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->upload_fd, static_cast(pthparam->start), static_cast(pthparam->size), (pthparam->is_copy ? "true" : "false"), pthparam->part_num); + } + + // Set result for exiting + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + *(pthparam->presult) = result; + } + + return reinterpret_cast(result); +} + // // Worker function for complete multipart upload request // @@ -833,6 +873,78 @@ int pre_multipart_upload_request(const std::string& path, const headers_t& meta, return 0; } +// +// Calls S3fsCurl::MultipartUploadPartRequest via multipart_upload_part_req_threadworker +// +int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result) +{ + // parameter for thread worker + auto* thargs = new multipart_upload_part_req_thparam; // free in multipart_upload_part_req_threadworker + thargs->path = path; + thargs->upload_id = upload_id; + thargs->upload_fd = upload_fd; + thargs->start = start; + thargs->size = size; + thargs->is_copy = is_copy; + thargs->part_num = part_num; + thargs->pthparam_lock = pthparam_lock; + thargs->petag = petag; + thargs->presult = req_result; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = psem; + ppoolparam.pfunc = multipart_upload_part_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Multipart Upload Part Thread Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", path.c_str(), upload_id.c_str(), upload_fd, static_cast(start), static_cast(size), (is_copy ? "true" : "false"), part_num);; + return -EIO; + } + + return 0; +} + +// +// Calls and Await S3fsCurl::MultipartUploadPartRequest via multipart_upload_part_req_threadworker +// +int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy) +{ + std::mutex thparam_lock; + int req_result = 0; + + // parameter for thread worker + auto* thargs = new multipart_upload_part_req_thparam; // free in multipart_upload_part_req_threadworker + thargs->path = path; + thargs->upload_id = upload_id; + thargs->upload_fd = upload_fd; + thargs->start = start; + thargs->size = size; + thargs->is_copy = is_copy; + thargs->part_num = part_num; + thargs->pthparam_lock = &thparam_lock; + thargs->petag = petag; + thargs->presult = &req_result; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = nullptr; // case await + ppoolparam.pfunc = multipart_upload_part_req_threadworker; + + // send request by thread + if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){ + S3FS_PRN_ERR("failed to setup Await Multipart Upload Part Thread Worker [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", path.c_str(), upload_id.c_str(), upload_fd, static_cast(start), static_cast(size), (is_copy ? "true" : "false"), part_num);; + return -EIO; + } + if(0 != req_result){ + S3FS_PRN_ERR("Await Multipart Upload Part Request by error(%d) [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", req_result, path.c_str(), upload_id.c_str(), upload_fd, static_cast(start), static_cast(size), (is_copy ? "true" : "false"), part_num); + return req_result; + } + return 0; +} + // // Calls S3fsCurl::MultipartUploadComplete via complete_multipart_upload_threadworker // diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index 65efdb9d68..cd4263541a 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -124,6 +124,23 @@ struct pre_multipart_upload_req_thparam int result = 0; }; +// +// Multipart Upload Part Request parameter structure for Thread Pool. +// +struct multipart_upload_part_req_thparam +{ + std::string path; + std::string upload_id; + int upload_fd = -1; + off_t start = 0; + off_t size = 0; + bool is_copy = false; + int part_num = -1; + std::mutex* pthparam_lock = nullptr; + etagpair* petag = nullptr; + int* presult = nullptr; +}; + // // Complete Multipart Upload Request parameter structure for Thread Pool. // @@ -200,6 +217,7 @@ void* put_req_threadworker(void* arg); void* list_bucket_req_threadworker(void* arg); void* check_service_req_threadworker(void* arg); void* pre_multipart_upload_req_threadworker(void* arg); +void* multipart_upload_part_req_threadworker(void* arg); void* complete_multipart_upload_threadworker(void* arg); void* abort_multipart_upload_req_threadworker(void* arg); void* multipart_put_head_req_threadworker(void* arg); @@ -217,6 +235,8 @@ int put_request(const std::string& strpath, const headers_t& meta, int fd, bool int list_bucket_request(const std::string& strpath, const std::string& query, std::string& responseBody); int check_service_request(const std::string& strpath, bool forceNoSSE, bool support_compat_dir, long& responseCode, std::string& responseBody); int pre_multipart_upload_request(const std::string& path, const headers_t& meta, std::string& upload_id); +int multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy, Semaphore* psem, std::mutex* pthparam_lock, int* req_result); +int await_multipart_upload_part_request(const std::string& path, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy); int complete_multipart_upload_request(const std::string& path, const std::string& upload_id, const etaglist_t& parts); int abort_multipart_upload_request(const std::string& path, const std::string& upload_id); int multipart_put_head_request(const std::string& strfrom, const std::string& strto, off_t size, const headers_t& meta);