Skip to content

Commit

Permalink
Refactored multipart put head request
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Sep 28, 2024
1 parent 698663a commit d6e2b29
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 58 deletions.
135 changes: 84 additions & 51 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4256,7 +4256,10 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, const
return result;
}

int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
//TEST - RM MULTICURL
//int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta)
//TEST - RM MULTICURL
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);

Expand Down Expand Up @@ -4285,7 +4288,10 @@ int S3fsCurl::CopyMultipartPostSetup(const char* from, const char* to, int part_
requestHeaders = curl_slist_sort_insert(requestHeaders, "Content-Type", contype.c_str());

// Make request headers
for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter){
//TEST - RM MULTICURL
// for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter){
for(headers_t::const_iterator iter = meta.begin(); iter != meta.end(); ++iter){
//TEST - RM MULTICURL
std::string key = lower(iter->first);
std::string value = iter->second;
if(key == "x-amz-copy-source"){
Expand Down Expand Up @@ -4385,67 +4391,94 @@ bool S3fsCurl::MixMultipartPostComplete()
return result;
}

int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy)
//TEST - RM MULTICURL
// int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy)
// {
// int result;
// std::string upload_id;
// off_t chunk;
// off_t bytes_remaining;
// etaglist_t list;
//
// S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
//
// if(0 != (result = PreMultipartPostRequest(tpath, meta, upload_id, is_copy))){
// return result;
// }
// DestroyCurlHandle();
//
// // Initialize S3fsMultiCurl
// S3fsMultiCurl curlmulti(GetMaxParallelCount());
// curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback);
// curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback);
//
// for(bytes_remaining = size, chunk = 0; 0 < bytes_remaining; bytes_remaining -= chunk){
// chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;
//
// std::ostringstream strrange;
// strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
// meta["x-amz-copy-source-range"] = strrange.str();
//
// // s3fscurl sub object
// std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
// s3fscurl_para->b_from = SAFESTRPTR(tpath);
// s3fscurl_para->b_meta = meta;
// s3fscurl_para->partdata.add_etag_list(list);
//
// // initiate upload part for parallel
// if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
// S3FS_PRN_ERR("failed uploading part setup(%d)", result);
// return result;
// }
//
// // set into parallel object
// if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
// S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
// return -EIO;
// }
// }
//
// // Multi request
// if(0 != (result = curlmulti.Request())){
// S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
// int result2;
// if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
// S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
// }
// return result;
// }
//
// if(0 != (result = CompleteMultipartPostRequest(tpath, upload_id, list))){
// return result;
// }
// return 0;
// }
///////////////

int S3fsCurl::MultipartPutHeadRequest(const std::string& path, int part_number, std::string& upload_id, const headers_t& meta)
{
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;
S3FS_PRN_INFO3("[path=%s][part_number=%d][upload_id=%s]", path.c_str(), part_number, upload_id.c_str());

S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
int result;

if(0 != (result = PreMultipartPostRequest(tpath, meta, upload_id, is_copy))){
// setup
if(0 != (result = CopyMultipartPostSetup(path.c_str(), path.c_str(), part_number, upload_id, meta))){
S3FS_PRN_ERR("failed multipart put head request setup(path=%s, part_number=%d, upload_id=%s) : %d", path.c_str(), part_number, upload_id.c_str(), result);
return result;
}
DestroyCurlHandle();

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartPostCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartPostRetryCallback);

for(bytes_remaining = size, chunk = 0; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;

std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
meta["x-amz-copy-source-range"] = strrange.str();

// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->b_from = SAFESTRPTR(tpath);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("failed multipart put head request lazysetup(path=%s, part_number=%d, upload_id=%s)", path.c_str(), part_number, upload_id.c_str());
return -EIO;
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
// request
if(0 != (result = RequestPerform())){
return result;
}

if(0 != (result = CompleteMultipartPostRequest(tpath, upload_id, list))){
return result;
}
return 0;
}
//TEST - RM MULTICURL

int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair)
{
Expand Down
14 changes: 12 additions & 2 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,17 @@ class S3fsCurl
size_t b_ssekey_pos; // backup for retrying
std::string b_ssevalue; // backup for retrying
sse_type_t b_ssetype; // backup for retrying
//TEST - RM MULTICURL - REMOVE THIS AFTER MULTICURL REMOVING
std::string b_from; // backup for retrying(for copy request)
//TEST - RM MULTICURL - REMOVE THIS AFTER MULTICURL REMOVING
headers_t b_meta; // backup for retrying(for copy request)
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string
Semaphore *sem;
//TEST - RM MULTICURL - REMOVE THIS AFTER MULTICURL REMOVING
std::mutex *completed_tids_lock;
std::vector<std::thread::id> *completed_tids;
//TEST - RM MULTICURL - REMOVE THIS AFTER MULTICURL REMOVING
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return

Expand Down Expand Up @@ -279,7 +283,10 @@ class S3fsCurl
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int UploadMultipartPostSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
//TEST - RM MULTICURL
// int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
int CopyMultipartPostSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
//TEST - RM MULTICURL
bool UploadMultipartPostComplete();
bool CopyMultipartPostComplete();
int MapPutErrorResponse(int result);
Expand Down Expand Up @@ -387,7 +394,10 @@ class S3fsCurl
bool MixMultipartPostComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy);
//TEST - RM MULTICURL
// int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta, bool is_copy);
int MultipartPutHeadRequest(const std::string& path, int part_number, std::string& upload_id, const headers_t& meta);
//TEST - RM MULTICURL
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size);

Expand Down
16 changes: 11 additions & 5 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,13 +943,19 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz
}

if(!nocopyapi && !nomultipart && size >= multipart_threshold){
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta, is_copy))){
//TEST - RM MULTICURL
// // [TODO]
// // This object will be removed after removing S3fsMultiCurl
// //
// S3fsCurl s3fscurl(true);
// if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta, is_copy))){
// return result;
// }

if(0 != (result = multipart_put_head_request(strpath, size, meta))){
return result;
}
//TEST - RM MULTICURL
}else{
// parameter for thread worker
put_head_req_thparam thargs;
Expand Down
Loading

0 comments on commit d6e2b29

Please sign in to comment.