Skip to content

Commit

Permalink
Removed last use of S3fsMultiCurl and changed those thread handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Nov 12, 2024
1 parent 814201e commit fab5871
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 827 deletions.
1 change: 0 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ s3fs_SOURCES = \
metaheader.cpp \
mpu_util.cpp \
curl.cpp \
curl_multi.cpp \
curl_util.cpp \
s3objlist.cpp \
cache.cpp \
Expand Down
317 changes: 2 additions & 315 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "s3fs.h"
#include "s3fs_logger.h"
#include "curl.h"
#include "curl_multi.h"
#include "curl_util.h"
#include "s3fs_auth.h"
#include "s3fs_cred.h"
Expand Down Expand Up @@ -1219,132 +1218,6 @@ bool S3fsCurl::SetIPResolveType(const char* value)
return true;
}

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::MultipartUploadPartCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MultipartUploadPartComplete();
}

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress constParameter
// cppcheck-suppress constParameterCallback
bool S3fsCurl::MixMultipartUploadCallback(S3fsCurl* s3fscurl, void* param)
{
if(!s3fscurl || param){ // this callback does not need a parameter
return false;
}
return s3fscurl->MultipartUploadPartComplete();
}

std::unique_ptr<S3fsCurl> S3fsCurl::MultipartUploadPartRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
// parse and get part_num, upload_id.
std::string upload_id;
std::string part_num_str;
int part_num;
off_t tmp_part_num = 0;
if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){
return nullptr;
}
upload_id = urlDecode(upload_id); // decode
if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){
return nullptr;
}
if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){
return nullptr;
}
part_num = static_cast<int>(tmp_part_num);

if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num);
return nullptr;
}

// duplicate request
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
newcurl->partdata.petag = s3fscurl->partdata.petag;
newcurl->partdata.fd = s3fscurl->partdata.fd;
newcurl->partdata.startpos = s3fscurl->b_partdata_startpos;
newcurl->partdata.size = s3fscurl->b_partdata_size;
newcurl->b_partdata_startpos = s3fscurl->b_partdata_startpos;
newcurl->b_partdata_size = s3fscurl->b_partdata_size;
newcurl->retry_count = s3fscurl->retry_count + 1;
newcurl->op = s3fscurl->op;
newcurl->type = s3fscurl->type;

// setup new curl object
if(0 != newcurl->MultipartUploadContentPartSetup(s3fscurl->path.c_str(), part_num, upload_id)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}

std::unique_ptr<S3fsCurl> S3fsCurl::CopyMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}
// parse and get part_num, upload_id.
std::string upload_id;
std::string part_num_str;
int part_num;
off_t tmp_part_num = 0;
if(!get_keyword_value(s3fscurl->url, "uploadId", upload_id)){
return nullptr;
}
upload_id = urlDecode(upload_id); // decode
if(!get_keyword_value(s3fscurl->url, "partNumber", part_num_str)){
return nullptr;
}
if(!s3fs_strtoofft(&tmp_part_num, part_num_str.c_str(), /*base=*/ 10)){
return nullptr;
}
part_num = static_cast<int>(tmp_part_num);

if(s3fscurl->retry_count >= S3fsCurl::retries){
S3FS_PRN_ERR("Over retry count(%d) limit(%s:%d).", s3fscurl->retry_count, s3fscurl->path.c_str(), part_num);
return nullptr;
}

// duplicate request
std::unique_ptr<S3fsCurl> newcurl(new S3fsCurl(s3fscurl->IsUseAhbe()));
newcurl->partdata.petag = s3fscurl->partdata.petag;
newcurl->b_from = s3fscurl->b_from;
newcurl->b_meta = s3fscurl->b_meta;
newcurl->retry_count = s3fscurl->retry_count + 1;
newcurl->op = s3fscurl->op;
newcurl->type = s3fscurl->type;

// setup new curl object
if(0 != newcurl->MultipartUploadCopyPartSetup(s3fscurl->b_from.c_str(), s3fscurl->path.c_str(), part_num, upload_id, s3fscurl->b_meta)){
S3FS_PRN_ERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
return nullptr;
}
return newcurl;
}

std::unique_ptr<S3fsCurl> S3fsCurl::MixMultipartUploadRetryCallback(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
return nullptr;
}

if(-1 == s3fscurl->partdata.fd){
return S3fsCurl::CopyMultipartUploadRetryCallback(s3fscurl);
}else{
return S3fsCurl::MultipartUploadPartRetryCallback(s3fscurl);
}
}

int S3fsCurl::MapPutErrorResponse(int result)
{
if(result != 0){
Expand Down Expand Up @@ -1372,185 +1245,6 @@ int S3fsCurl::MapPutErrorResponse(int result)
return result;
}

int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;
off_t remaining_bytes;

S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

if(-1 == fstat(fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}

if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::MultipartUploadPartCallback);
curlmulti.SetRetryCallback(S3fsCurl::MultipartUploadPartRetryCallback);

// cycle through open fd, pulling off 10MB chunks at a time
for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){
off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;

// s3fscurl sub object
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->partdata.fd = fd;
s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(list);

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
remaining_bytes -= chunk;
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}

if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
}

int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages)
{
int result;
std::string upload_id;
struct stat st;
etaglist_t list;

S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

if(-1 == fstat(fd, &st)){
S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
return -errno;
}

if(0 != (result = pre_multipart_upload_request(std::string(tpath), meta, upload_id))){
return result;
}

// for copy multipart
std::string srcresource;
std::string srcurl;
MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
meta["Content-Type"] = S3fsCurl::LookupMimeType(tpath);
meta["x-amz-copy-source"] = srcresource;

// Initialize S3fsMultiCurl
S3fsMultiCurl curlmulti(GetMaxParallelCount());
curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartUploadCallback);
curlmulti.SetRetryCallback(S3fsCurl::MixMultipartUploadRetryCallback);

for(auto iter = mixuppages.cbegin(); iter != mixuppages.cend(); ++iter){
if(iter->modified){
// Multipart upload
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));
s3fscurl_para->partdata.fd = fd;
s3fscurl_para->partdata.startpos = iter->offset;
s3fscurl_para->partdata.size = iter->bytes;
s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
s3fscurl_para->b_partdata_size = s3fscurl_para->partdata.size;
s3fscurl_para->partdata.add_etag_list(list);

S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset), static_cast<long long>(iter->bytes), s3fscurl_para->partdata.get_part_number());

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadContentPartSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}else{
// Multipart copy
for(off_t i = 0, bytes = 0; i < iter->bytes; i += bytes){
std::unique_ptr<S3fsCurl> s3fscurl_para(new S3fsCurl(true));

bytes = std::min(GetMultipartCopySize(), iter->bytes - i);
/* every part should be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB */
off_t remain_bytes = iter->bytes - i - bytes;

if ((MIN_MULTIPART_SIZE > remain_bytes) && (0 < remain_bytes)){
if(FIVE_GB < (bytes + remain_bytes)){
bytes = (bytes + remain_bytes)/2;
} else{
bytes += remain_bytes;
}
}

std::ostringstream strrange;
strrange << "bytes=" << (iter->offset + i) << "-" << (iter->offset + i + bytes - 1);
meta["x-amz-copy-source-range"] = strrange.str();

s3fscurl_para->b_from = SAFESTRPTR(tpath);
s3fscurl_para->b_meta = meta;
s3fscurl_para->partdata.add_etag_list(list);

S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset + i), static_cast<long long>(bytes), s3fscurl_para->partdata.get_part_number());

// initiate upload part for parallel
if(0 != (result = s3fscurl_para->MultipartUploadCopyPartSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
S3FS_PRN_ERR("failed uploading part setup(%d)", result);
return result;
}

// set into parallel object
if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl_para))){
S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
return -EIO;
}
}
}
}

// Multi request
if(0 != (result = curlmulti.Request())){
S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);
int result2;
if(0 != (result2 = abort_multipart_upload_request(std::string(tpath), upload_id))){
S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
}
return result;
}

if(0 != (result = complete_multipart_upload_request(std::string(tpath), upload_id, list))){
return result;
}
return 0;
}

bool S3fsCurl::MultipartUploadPartSetCurlOpts(S3fsCurl* s3fscurl)
{
if(!s3fscurl){
Expand Down Expand Up @@ -1794,8 +1488,7 @@ S3fsCurl::S3fsCurl(bool ahbe) :
type(REQTYPE::UNSET), requestHeaders(nullptr),
LastResponseCode(S3FSCURL_RESPONSECODE_NOTSET), postdata(nullptr), postdata_remaining(0), is_use_ahbe(ahbe),
retry_count(0), b_postdata(nullptr), b_postdata_remaining(0), b_partdata_startpos(0), b_partdata_size(0),
b_ssekey_pos(-1), b_ssetype(sse_type_t::SSE_DISABLE),
sem(nullptr), completed_tids_lock(nullptr), completed_tids(nullptr), fpLazySetup(nullptr), curlCode(CURLE_OK)
fpLazySetup(nullptr), curlCode(CURLE_OK)
{
if(!S3fsCurl::ps3fscred){
S3FS_PRN_CRIT("The object of S3fs Credential class is not initialized.");
Expand Down Expand Up @@ -2078,7 +1771,7 @@ bool S3fsCurl::RemakeHandle()
headdata.clear();
LastResponseCode = S3FSCURL_RESPONSECODE_NOTSET;

// count up(only use for multipart)
// retry count up
retry_count++;

// set from backup
Expand Down Expand Up @@ -3194,7 +2887,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, size_t ssekey_pos)
return false;
}
}
b_ssekey_pos = ssekey_pos;

op = "HEAD";
type = REQTYPE::HEAD;
Expand Down Expand Up @@ -3533,9 +3225,6 @@ int S3fsCurl::PreGetObjectRequest(const char* tpath, int fd, off_t start, off_t
partdata.size = size;
b_partdata_startpos = start;
b_partdata_size = size;
b_ssetype = ssetype;
b_ssevalue = ssevalue;
b_ssekey_pos = -1; // not use this value for get object.

return 0;
}
Expand Down Expand Up @@ -3849,8 +3538,6 @@ int S3fsCurl::MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t s
strrange << "bytes=" << start << "-" << (start + size - 1);
meta["x-amz-copy-source-range"] = strrange.str();

b_from = SAFESTRPTR(tpath);
b_meta = meta;
partdata.set_etag(petag); // [NOTE] be careful, the value is set directly

S3FS_PRN_INFO3("Copy Part [path=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long int>(start), static_cast<long long int>(size), part_num);
Expand Down
Loading

0 comments on commit fab5871

Please sign in to comment.