Skip to content

Commit

Permalink
Refactored multipart put head request
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Nov 11, 2024
1 parent 891b5cd commit fe376d4
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 181 deletions.
145 changes: 12 additions & 133 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4252,7 +4252,7 @@ int S3fsCurl::MultipartUploadPartRequest(const char* tpath, int part_num, const
return result;
}

int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta)
int S3fsCurl::CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta)
{
S3FS_PRN_INFO3("[from=%s][to=%s][part=%d]", SAFESTRPTR(from), SAFESTRPTR(to), part_num);

Expand Down Expand Up @@ -4344,20 +4344,6 @@ bool S3fsCurl::MultipartUploadPartComplete()
return true;
}

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
return s3fscurl->CopyMultipartUploadComplete();
}

bool S3fsCurl::CopyMultipartUploadComplete()
{
std::string etag;
Expand All @@ -4381,65 +4367,27 @@ bool S3fsCurl::MixMultipartUploadComplete()
return result;
}

int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta)
int S3fsCurl::MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta)
{
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;
S3FS_PRN_INFO3("[from=%s][to=%s][part_number=%d][upload_id=%s]", from.c_str(), to.c_str(), part_number, upload_id.c_str());

S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));
int result;

if(0 != (result = PreMultipartUploadRequest(tpath, meta, upload_id))){
// setup
if(0 != (result = CopyMultipartUploadSetup(from.c_str(), to.c_str(), part_number, upload_id, meta))){
S3FS_PRN_ERR("failed multipart put head request setup(from=%s, to=%s, part_number=%d, upload_id=%s) : %d", from.c_str(), to.c_str(), part_number, upload_id.c_str(), result);
return result;
}
DestroyCurlHandle();

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);

for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;

std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
meta["x-amz-copy-source-range"] = strrange.str();

// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->b_from = SAFESTRPTR(tpath);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
if(!fpLazySetup || !fpLazySetup(this)){
S3FS_PRN_ERR("failed multipart put head request lazysetup(from=%s, to=%s, part_number=%d, upload_id=%s)", from.c_str(), to.c_str(), part_number, upload_id.c_str());
return -EIO;
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
// request
if(0 != (result = RequestPerform())){
return result;
}

if(0 != (result = MultipartUploadComplete(tpath, upload_id, list))){
return result;
}
return 0;
}

Expand All @@ -4466,75 +4414,6 @@ int S3fsCurl::MultipartUploadRequest(const std::string& upload_id, const char* t
return 0;
}

int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size)
{
int result;
std::string upload_id;
off_t chunk;
off_t bytes_remaining;
etaglist_t list;

S3FS_PRN_INFO3("[from=%s][to=%s]", SAFESTRPTR(from), SAFESTRPTR(to));

std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(from).c_str(), srcresource, srcurl);

meta["Content-Type"] = S3fsCurl::LookupMimeType(to);
meta["x-amz-copy-source"] = srcresource;

if(0 != (result = PreMultipartUploadRequest(to, meta, upload_id))){
return result;
}
DestroyCurlHandle();

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::CopyMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::CopyMultipartUploadRetryCallback);

for(bytes_remaining = size; 0 < bytes_remaining; bytes_remaining -= chunk){
chunk = bytes_remaining > GetMultipartCopySize() ? GetMultipartCopySize() : bytes_remaining;

std::ostringstream strrange;
strrange << "bytes=" << (size - bytes_remaining) << "-" << (size - bytes_remaining + chunk - 1);
meta["x-amz-copy-source-range"] = strrange.str();

// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->b_from = SAFESTRPTR(from);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->CopyMultipartUploadSetup(from, to, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", to);
return -EIO;
}
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}

if(0 != (result = MultipartUploadComplete(to, upload_id, list))){
return result;
}
return 0;
}

/*
* Local variables:
* tab-width: 4
Expand Down
12 changes: 5 additions & 7 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ class S3fsCurl
size_t b_ssekey_pos; // backup for retrying
std::string b_ssevalue; // backup for retrying
sse_type_t b_ssetype; // backup for retrying
std::string b_from; // backup for retrying(for copy request)
std::string b_from; // backup for retrying(for copy request) ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
headers_t b_meta; // backup for retrying(for copy request)
std::string op; // the HTTP verb of the request ("PUT", "GET", etc.)
std::string query_string; // request query string
Semaphore *sem;
std::mutex *completed_tids_lock;
std::vector<std::thread::id> *completed_tids PT_GUARDED_BY(*completed_tids_lock);
std::mutex *completed_tids_lock; // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
std::vector<std::thread::id> *completed_tids PT_GUARDED_BY(*completed_tids_lock); // ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.)
s3fscurl_lazy_setup fpLazySetup; // curl options for lazy setting function
CURLcode curlCode; // handle curl return

Expand Down Expand Up @@ -241,7 +241,6 @@ class S3fsCurl
static size_t DownloadWriteCallback(void* ptr, size_t size, size_t nmemb, void* userp);

static bool MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param);
static bool CopyMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static bool MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param);
static std::unique_ptr<S3fsCurl> MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl);
static std::unique_ptr<S3fsCurl> CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl);
Expand Down Expand Up @@ -280,7 +279,7 @@ class S3fsCurl
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, headers_t& meta);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadPartComplete();
bool CopyMultipartUploadComplete();
int MapPutErrorResponse(int result);
Expand Down Expand Up @@ -385,9 +384,8 @@ class S3fsCurl
bool MixMultipartUploadComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartHeadRequest(const char* tpath, off_t size, headers_t& meta);
int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
int MultipartRenameRequest(const char* from, const char* to, headers_t& meta, off_t size);

// methods(variables)
const std::string& GetPath() const { return path; }
Expand Down
2 changes: 2 additions & 0 deletions src/curl_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstdint>
#include <curl/curl.h>
#include <string>
#include "metaheader.h"

enum class sse_type_t : uint8_t;

Expand All @@ -38,6 +39,7 @@ std::string get_header_value(const struct curl_slist* list, const std::string &k
bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url);
std::string prepare_url(const char* url);
bool get_object_sse_type(const char* path, sse_type_t& ssetype, std::string& ssevalue); // implement in s3fs.cpp
int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size = true); // implement in s3fs.cpp

bool make_md5_from_binary(const char* pstr, size_t length, std::string& md5);
std::string url_to_host(const std::string &url);
Expand Down
3 changes: 0 additions & 3 deletions src/fdcache_entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2513,9 +2513,6 @@ bool FdEntity::MergeOrgMeta(headers_t& updatemeta)
return (pending_status_t::NO_UPDATE_PENDING != pending_status);
}

// global function in s3fs.cpp
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true);

int FdEntity::UploadPendingHasLock(int fd)
{
int result;
Expand Down
46 changes: 8 additions & 38 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "fdcache_stat.h"
#include "curl.h"
#include "curl_multi.h"
#include "curl_util.h"
#include "s3objlist.h"
#include "cache.h"
#include "addhead.h"
Expand Down Expand Up @@ -110,11 +111,6 @@ static bool update_parent_dir_stat= false; // default not updating parent direc
static fsblkcnt_t bucket_block_count; // advertised block count of the bucket
static unsigned long s3fs_block_size = 16 * 1024 * 1024; // s3fs block size is 16MB

//-------------------------------------------------------------------
// Global functions : prototype
//-------------------------------------------------------------------
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size = true); // [NOTE] global function because this is called from FdEntity class

//-------------------------------------------------------------------
// Static functions : prototype
//-------------------------------------------------------------------
Expand Down Expand Up @@ -849,7 +845,7 @@ static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char*
// create or update s3 meta
// @return fuse return code
//
int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_size)
int put_headers(const char* path, const headers_t& meta, bool is_copy, bool use_st_size)
{
int result;
off_t size;
Expand All @@ -876,11 +872,7 @@ int put_headers(const char* path, headers_t& meta, bool is_copy, bool use_st_siz
}

if(!nocopyapi && !nomultipart && size >= multipart_threshold){
// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartHeadRequest(strpath.c_str(), size, meta))){
if(0 != (result = multipart_put_head_request(strpath, strpath, size, meta))){
return result;
}
}else{
Expand Down Expand Up @@ -1560,14 +1552,9 @@ static int rename_large_object(const char* from, const char* to)
return result;
}

// [TODO]
// This object will be removed after removing S3fsMultiCurl
//
S3fsCurl s3fscurl(true);
if(0 != (result = s3fscurl.MultipartRenameRequest(from, to, meta, buf.st_size))){
if(0 != (result = multipart_put_head_request(std::string(from), std::string(to), buf.st_size, meta))){
return result;
}
s3fscurl.DestroyCurlHandle();

// Rename cache file
FdManager::get()->Rename(from, to);
Expand Down Expand Up @@ -3124,27 +3111,10 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf
continue;
}

// parameter for thread worker
auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker
thargs->psyncfiller = &syncfiller;
thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member
thargs->pretrycount = &retrycount;
thargs->pnotfound_list = &notfound_list;
thargs->use_wtf8 = use_wtf8;
thargs->path = disppath;
thargs->presult = &req_result;

// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = thargs;
ppoolparam.psem = &multi_head_sem;
ppoolparam.pfunc = multi_head_req_threadworker;

// setup instruction
if(!ThreadPoolMan::Instruct(ppoolparam)){
S3FS_PRN_ERR("failed setup instruction for one header request.");
delete thargs;
return -EIO;
// set one head request
int result;
if(0 != (result = multi_head_request(disppath, syncfiller, thparam_lock, retrycount, notfound_list, use_wtf8, req_result, multi_head_sem))){
return result;
}
++req_count;
}
Expand Down
Loading

0 comments on commit fe376d4

Please sign in to comment.