Skip to content

Commit

Permalink
Changed all single requests to use through ThreadPoolMan
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Sep 28, 2024
1 parent 1b6c932 commit ef023ad
Show file tree
Hide file tree
Showing 15 changed files with 1,422 additions and 357 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ s3fs_SOURCES = \
string_util.cpp \
s3fs_cred.cpp \
s3fs_util.cpp \
s3fs_threadreqs.cpp \
fdcache.cpp \
fdcache_entity.cpp \
fdcache_page.cpp \
Expand Down
56 changes: 20 additions & 36 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "s3fs_util.h"
#include "string_util.h"
#include "addhead.h"
#include "s3fs_threadreqs.h"

//-------------------------------------------------------------------
// Symbols
Expand Down Expand Up @@ -1432,14 +1433,13 @@ std::unique_ptr<S3fsCurl> S3fsCurl::CreateParallelS3fsCurl(const char* tpath, in
return s3fscurl;
}

int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd)
int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;
off_t remaining_bytes;
S3fsCurl s3fscurl(true);

S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

Expand All @@ -1448,10 +1448,9 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
return -errno;
}

if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id, false))){
if(0 != (result = pre_multipart_post_request(std::string(tpath), meta, false, upload_id))){
return result;
}
s3fscurl.DestroyCurlHandle();

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
Expand Down Expand Up @@ -1488,18 +1487,14 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}

return result;
}

if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
if(0 != (result = complete_multipart_post_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
Expand All @@ -1511,7 +1506,6 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
std::string upload_id;
struct stat st;
etaglist_t list;
S3fsCurl s3fscurl(true);

S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

Expand All @@ -1520,10 +1514,9 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
return -errno;
}

if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id, true))){
if(0 != (result = pre_multipart_post_request(std::string(tpath), meta, true, upload_id))){
return result;
}
s3fscurl.DestroyCurlHandle();

// for copy multipart
std::string srcresource;
Expand Down Expand Up @@ -1606,17 +1599,14 @@ int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& me
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}

if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
if(0 != (result = complete_multipart_post_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
Expand Down Expand Up @@ -3406,7 +3396,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta)
return 0;
}

int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
int S3fsCurl::PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy)
{
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));

Expand All @@ -3430,7 +3420,7 @@ int S3fsCurl::PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy)
requestHeaders = curl_slist_sort_insert(requestHeaders, "Content-Type", contype.c_str());

// Make request headers
for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter){
for(headers_t::const_iterator iter = meta.begin(); iter != meta.end(); ++iter){
std::string key = lower(iter->first);
std::string value = iter->second;
if(is_prefix(key.c_str(), "x-amz-acl")){
Expand Down Expand Up @@ -3868,7 +3858,7 @@ int S3fsCurl::ListBucketRequest(const char* tpath, const char* query)
// Date: Mon, 1 Nov 2010 20:34:56 GMT
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZCBieSBlbHZpbmc=
//
int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id, bool is_copy)
int S3fsCurl::PreMultipartPostRequest(const char* tpath, const headers_t& meta, std::string& upload_id, bool is_copy)
{
S3FS_PRN_INFO3("[tpath=%s]", SAFESTRPTR(tpath));

Expand All @@ -3892,7 +3882,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::s

std::string contype = S3fsCurl::LookupMimeType(tpath);

for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter){
for(headers_t::const_iterator iter = meta.begin(); iter != meta.end(); ++iter){
std::string key = lower(iter->first);
std::string value = iter->second;
if(is_prefix(key.c_str(), "x-amz-acl")){
Expand Down Expand Up @@ -3971,7 +3961,7 @@ int S3fsCurl::PreMultipartPostRequest(const char* tpath, headers_t& meta, std::s
return 0;
}

int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, etaglist_t& parts)
int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts)
{
S3FS_PRN_INFO3("[tpath=%s][parts=%zu]", SAFESTRPTR(tpath), parts.size());

Expand All @@ -3982,7 +3972,7 @@ int S3fsCurl::CompleteMultipartPostRequest(const char* tpath, const std::string&
// make contents
std::string postContent;
postContent += "<CompleteMultipartUpload>\n";
for(etaglist_t::iterator it = parts.begin(); it != parts.end(); ++it){
for(etaglist_t::const_iterator it = parts.begin(); it != parts.end(); ++it){
if(it->etag.empty()){
S3FS_PRN_ERR("%d file part is not finished uploading.", it->part_num);
return -EIO;
Expand Down Expand Up @@ -4446,11 +4436,8 @@ int S3fsCurl::MultipartHeadRequest(const char* tpath, off_t size, headers_t& met
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
Expand Down Expand Up @@ -4541,11 +4528,8 @@ int S3fsCurl::MultipartRenameRequest(const char* from, const char* to, headers_t
// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

S3fsCurl s3fscurl_abort(true);
int result2 = s3fscurl_abort.AbortMultipartUpload(to, upload_id);
s3fscurl_abort.DestroyCurlHandle();
if(result2 != 0){
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(to), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
Expand Down
8 changes: 4 additions & 4 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class S3fsCurl
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);
static int ParallelGetObjectRequest(const char* tpath, int fd, off_t start, off_t size);

Expand Down Expand Up @@ -374,14 +374,14 @@ class S3fsCurl
return PreHeadRequest(tpath.c_str(), bpath.c_str(), savedpath.c_str(), ssekey_pos);
}
int HeadRequest(const char* tpath, headers_t& meta);
int PutHeadRequest(const char* tpath, headers_t& meta, bool is_copy);
int PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy);
int PutRequest(const char* tpath, headers_t& meta, int fd);
int PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t size, sse_type_t ssetype, const std::string& ssevalue);
int GetObjectRequest(const char* tpath, int fd, off_t start = -1, off_t size = -1);
int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse);
int ListBucketRequest(const char* tpath, const char* query);
int PreMultipartPostRequest(const char* tpath, headers_t& meta, std::string& upload_id, bool is_copy);
int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, etaglist_t& parts);
int PreMultipartPostRequest(const char* tpath, const headers_t& meta, std::string& upload_id, bool is_copy);
int CompleteMultipartPostRequest(const char* tpath, const std::string& upload_id, const etaglist_t& parts);
int UploadMultipartPostRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartPostComplete();
int MultipartListRequest(std::string& body);
Expand Down
Loading

0 comments on commit ef023ad

Please sign in to comment.