Skip to content

Commit

Permalink
Refactored for standardizing content and copy handling for Multipart …
Browse files Browse the repository at this point in the history
…Upload
  • Loading branch information
ggtakec committed Nov 9, 2024
1 parent 4693ac3 commit f8dc315
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 361 deletions.
226 changes: 98 additions & 128 deletions src/curl.cpp

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ class S3fsCurl
}
std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token);
std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token);
int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int CopyMultipartUploadSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadPartComplete();
bool CopyMultipartUploadComplete();
int MultipartUploadContentPartSetup(const char* tpath, int part_num, const std::string& upload_id);
int MultipartUploadCopyPartSetup(const char* from, const char* to, int part_num, const std::string& upload_id, const headers_t& meta);
bool MultipartUploadContentPartComplete();
bool MultipartUploadCopyPartComplete();
int MapPutErrorResponse(int result);

public:
Expand All @@ -289,7 +289,6 @@ class S3fsCurl
static bool InitCredentialObject(S3fsCred* pcredobj);
static bool InitMimeType(const std::string& strFile);
static bool DestroyS3fsCurl();
static std::unique_ptr<S3fsCurl> CreateParallelS3fsCurl(const char* tpath, int fd, off_t start, off_t size, int part_num, bool is_copy, etagpair* petag, const std::string& upload_id, int& result);
static int ParallelMultipartUploadRequest(const char* tpath, const headers_t& meta, int fd);
static int ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages);

Expand Down Expand Up @@ -377,13 +376,13 @@ class S3fsCurl
int CheckBucket(const char* check_path, bool compat_dir, bool force_no_sse);
int ListBucketRequest(const char* tpath, const char* query);
int PreMultipartUploadRequest(const char* tpath, const headers_t& meta, std::string& upload_id);
int MultipartUploadPartSetup(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);
int MultipartUploadComplete(const char* tpath, const std::string& upload_id, const etaglist_t& parts);
int MultipartUploadPartRequest(const char* tpath, int part_num, const std::string& upload_id);
bool MixMultipartUploadComplete();
bool MultipartUploadPartComplete();
int MultipartListRequest(std::string& body);
int AbortMultipartUpload(const char* tpath, const std::string& upload_id);
int MultipartPutHeadRequest(const std::string& from, const std::string& to, int part_number, const std::string& upload_id, const headers_t& meta);
int MultipartUploadRequest(const std::string& upload_id, const char* tpath, int fd, off_t offset, off_t size, etagpair* petagpair);
int MultipartUploadPartRequest(const char* tpath, int upload_fd, off_t start, off_t size, int part_num, const std::string& upload_id, etagpair* petag, bool is_copy);

// methods(variables)
const std::string& GetPath() const { return path; }
Expand Down
90 changes: 11 additions & 79 deletions src/fdcache_entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,6 @@
//------------------------------------------------
static constexpr int MAX_MULTIPART_CNT = 10 * 1000; // S3 multipart max count

//------------------------------------------------
// Structure of parameters to pass to thread
//------------------------------------------------
//
// Multipart Upload Request parameter structure for Thread Pool.
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
struct multipart_upload_req_thparam
{
std::string path;
std::string upload_id;
int fd = -1;
off_t start = 0;
off_t size = 0;
etagpair* petagpair = nullptr;
int result = 0;
};

//------------------------------------------------
// FdEntity class variables
//------------------------------------------------
Expand Down Expand Up @@ -129,25 +110,6 @@ ino_t FdEntity::GetInode(int fd)
return st.st_ino;
}

//
// Worker function for multipart upload request
//
// ([TODO] This is a temporary structure is moved when S3fsMultiCurl is deprecated.)
//
void* FdEntity::MultipartUploadThreadWorker(void* arg)
{
auto* pthparam = static_cast<multipart_upload_req_thparam*>(arg);
if(!pthparam){
return reinterpret_cast<void*>(-EIO);
}
S3FS_PRN_INFO3("Multipart Upload Request [path=%s][upload id=%s][fd=%d][start=%lld][size=%lld][etagpair=%p]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->fd, static_cast<long long>(pthparam->start), static_cast<long long>(pthparam->size), pthparam->petagpair);

S3fsCurl s3fscurl(true);
pthparam->result = s3fscurl.MultipartUploadRequest(pthparam->upload_id, pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->petagpair);

return reinterpret_cast<void*>(pthparam->result);
}

//------------------------------------------------
// FdEntity methods
//------------------------------------------------
Expand Down Expand Up @@ -1260,9 +1222,6 @@ int FdEntity::NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start, off_t si
return result;
}

//
// Common method that calls S3fsCurl::PreMultipartUploadRequest via pre_multipart_upload_request
//
// [NOTE]
// If the request is successful, initialize upload_id.
//
Expand All @@ -1273,19 +1232,11 @@ int FdEntity::PreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
return -EIO;
}

// get upload_id
std::string upload_id;
int result;
if(0 != (result = pre_multipart_upload_request(path, orgmeta, upload_id))){
int result;
if(0 != (result = pseudo_obj->PreMultipartUploadRequest(path, orgmeta))){
return result;
}

// reset upload_id
if(!pseudo_obj->InitialUploadInfo(upload_id)){
S3FS_PRN_ERR("failed to initialize upload id(%s)", upload_id.c_str());
return -EIO;
}

// Clear the dirty flag, because the meta data is updated.
pending_status = pending_status_t::NO_UPDATE_PENDING;

Expand Down Expand Up @@ -1318,49 +1269,30 @@ int FdEntity::NoCachePreMultipartUploadRequest(PseudoFdInfo* pseudo_obj)
// At no disk space for caching object.
// This method is uploading one part of multipart.
//
// ([TODO] This is a temporary modification till S3fsMultiCurl is deprecated.)
//
int FdEntity::NoCacheMultipartUploadRequest(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size)
{
if(-1 == tgfd || !pseudo_obj || !pseudo_obj->IsUploading()){
S3FS_PRN_ERR("Need to initialize for multipart upload.");
return -EIO;
}

// parameter for thread worker
multipart_upload_req_thparam thargs;
thargs.path = path;
thargs.upload_id.clear();
thargs.fd = tgfd;
thargs.start = start;
thargs.size = size;
thargs.petagpair = nullptr;
thargs.result = 0;

// get upload id
if(!pseudo_obj->GetUploadId(thargs.upload_id)){
std::string upload_id;
if(!pseudo_obj->GetUploadId(upload_id)){
return -EIO;
}

// append new part and get it's etag string pointer
if(!pseudo_obj->AppendUploadPart(start, size, false, &(thargs.petagpair))){
etagpair* petag = nullptr;
if(!pseudo_obj->AppendUploadPart(start, size, false, &petag)){
return -EIO;
}

// make parameter for thread pool
thpoolman_param ppoolparam;
ppoolparam.args = &thargs;
ppoolparam.psem = nullptr; // case await
ppoolparam.pfunc = FdEntity::MultipartUploadThreadWorker;

// send request by thread
if(!ThreadPoolMan::AwaitInstruct(ppoolparam)){
S3FS_PRN_ERR("failed to setup Get Object Request Thread Worker");
return -EIO;
}
if(0 != thargs.result){
S3FS_PRN_ERR("Multipart Upload Request(path=%s, upload_id=%s, fd=%d, start=%lld, size=%lld) returns with error(%d)", path.c_str(), thargs.upload_id.c_str(), tgfd, static_cast<long long int>(start), static_cast<long long int>(size), thargs.result);
return thargs.result;
// request to thread
int result;
if(0 != (result = await_multipart_upload_part_request(path, tgfd, start, size, petag->part_num, upload_id, petag, false))){
S3FS_PRN_ERR("Failed No Cache Multipart Upload Part Request by error(%d) [path=%s][upload_id=%s][fd=%d][start=%lld][size=%lld]", result, path.c_str(), upload_id.c_str(), tgfd, static_cast<long long int>(start), static_cast<long long int>(size));
return result;
}
return 0;
}
Expand Down
1 change: 0 additions & 1 deletion src/fdcache_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class FdEntity : public std::enable_shared_from_this<FdEntity>
private:
static int FillFile(int fd, unsigned char byte, off_t size, off_t start);
static ino_t GetInode(int fd);
static void* MultipartUploadThreadWorker(void* arg); // ([TODO] This is a temporary method is moved when S3fsMultiCurl is deprecated.)

void Clear();
ino_t GetInode() const REQUIRES(FdEntity::fdent_data_lock);
Expand Down
Loading

0 comments on commit f8dc315

Please sign in to comment.