Skip to content

Commit

Permalink
Checked and fixed the internal data lock of PseudoFdInfo class
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Jul 21, 2024
1 parent 411e423 commit d8df82a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 40 deletions.
99 changes: 61 additions & 38 deletions src/fdcache_fdinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ bool PseudoFdInfo::Clear()
return true;
}

bool PseudoFdInfo::IsUploadingHasLock() const
{
return !upload_id.empty();
}

bool PseudoFdInfo::IsUploading() const
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
return IsUploadingHasLock();
}

void PseudoFdInfo::CloseUploadFd()
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
Expand All @@ -157,6 +168,8 @@ void PseudoFdInfo::CloseUploadFd()

bool PseudoFdInfo::OpenUploadFd()
{
const std::lock_guard<std::mutex> lock(upload_list_lock);

if(-1 != upload_fd){
// already initialized
return true;
Expand Down Expand Up @@ -287,7 +300,9 @@ bool PseudoFdInfo::CompleteInstruction(int result)

bool PseudoFdInfo::GetUploadId(std::string& id) const
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);

if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
Expand All @@ -297,13 +312,13 @@ bool PseudoFdInfo::GetUploadId(std::string& id) const

bool PseudoFdInfo::GetEtaglist(etaglist_t& list) const
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);

if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}

const std::lock_guard<std::mutex> lock(upload_list_lock);

list.clear();
for(filepart_list_t::const_iterator iter = upload_list.begin(); iter != upload_list.end(); ++iter){
if(iter->petag){
Expand All @@ -325,12 +340,13 @@ bool PseudoFdInfo::GetEtaglist(etaglist_t& list) const
//
bool PseudoFdInfo::AppendUploadPart(off_t start, off_t size, bool is_copy, etagpair** ppetag)
{
if(!IsUploading()){
const std::lock_guard<std::mutex> lock(upload_list_lock);

if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}

const std::lock_guard<std::mutex> lock(upload_list_lock);
off_t next_start_pos = 0;
if(!upload_list.empty()){
next_start_pos = upload_list.back().startpos + upload_list.back().size;
Expand Down Expand Up @@ -365,9 +381,11 @@ static bool filepart_partnum_compare(const filepart& src1, const filepart& src2)

bool PseudoFdInfo::InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag)
{
const std::lock_guard<std::mutex> lock(upload_list_lock);

//S3FS_PRN_DBG("[start=%lld][size=%lld][part_num=%d][is_copy=%s]", static_cast<long long int>(start), static_cast<long long int>(size), part_num, (is_copy ? "true" : "false"));

if(!IsUploading()){
if(!IsUploadingHasLock()){
S3FS_PRN_ERR("Multipart Upload has not started yet.");
return false;
}
Expand Down Expand Up @@ -405,6 +423,11 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
return false;
}

std::string tmp_upload_id;
if(!GetUploadId(tmp_upload_id)){
return false;
}

for(mp_part_list_t::const_iterator iter = mplist.begin(); iter != mplist.end(); ++iter){
// Insert upload part
etagpair* petag = nullptr;
Expand All @@ -417,7 +440,7 @@ bool PseudoFdInfo::ParallelMultipartUpload(const char* path, const mp_part_list_
pseudofdinfo_thparam* thargs = new pseudofdinfo_thparam;
thargs->ppseudofdinfo = this;
thargs->path = SAFESTRPTR(path);
thargs->upload_id = upload_id;
thargs->upload_id = tmp_upload_id;
thargs->upload_fd = upload_fd;
thargs->start = iter->start;
thargs->size = iter->size;
Expand Down Expand Up @@ -448,16 +471,12 @@ bool PseudoFdInfo::ParallelMultipartUploadAll(const char* path, const mp_part_li

result = 0;

{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(!OpenUploadFd()){
return false;
}

if(!ParallelMultipartUpload(path, to_upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, to_upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), to_upload_list.size(), copy_list.size());
return false;
}
if(!OpenUploadFd()){
return false;
}
if(!ParallelMultipartUpload(path, to_upload_list, false) || !ParallelMultipartUpload(path, copy_list, true)){
S3FS_PRN_ERR("Failed setup instruction for uploading(path=%s, to_upload_list=%zu, copy_list=%zu).", SAFESTRPTR(path), to_upload_list.size(), copy_list.size());
return false;
}

// Wait for all thread exiting
Expand Down Expand Up @@ -695,28 +714,32 @@ bool PseudoFdInfo::ExtractUploadPartsFromUntreatedArea(const off_t& untreated_st
// Also, it is assumed that it must not be a copy area.
// So if the areas overlap, include uploaded area as an untreated area.
//
for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){
// Check overlap
if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){
// Areas do not overlap
++cur_iter;
{
const std::lock_guard<std::mutex> lock(upload_list_lock);

}else{
// The areas overlap
//
// Since the start position of the uploaded area is aligned with the boundary,
// it is not necessary to check the start position.
// If the uploaded area exceeds the untreated area, expand the untreated area.
//
if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){
aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size);
}
for(filepart_list_t::iterator cur_iter = upload_list.begin(); cur_iter != upload_list.end(); /* ++cur_iter */){
// Check overlap
if((cur_iter->startpos + cur_iter->size - 1) < aligned_start || (aligned_start + aligned_size - 1) < cur_iter->startpos){
// Areas do not overlap
++cur_iter;

//
// Add this to cancel list
//
cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list
cur_iter = upload_list.erase(cur_iter);
}else{
// The areas overlap
//
// Since the start position of the uploaded area is aligned with the boundary,
// it is not necessary to check the start position.
// If the uploaded area exceeds the untreated area, expand the untreated area.
//
if((aligned_start + aligned_size - 1) < (cur_iter->startpos + cur_iter->size - 1)){
aligned_size += (cur_iter->startpos + cur_iter->size) - (aligned_start + aligned_size);
}

//
// Add this to cancel list
//
cancel_upload_list.push_back(*cur_iter); // Copy and Push to cancel list
cur_iter = upload_list.erase(cur_iter);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/fdcache_fdinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class PseudoFdInfo
bool ResetUploadInfo() REQUIRES(upload_list_lock);
bool RowInitialUploadInfo(const std::string& id, bool is_cancel_mp);
bool CompleteInstruction(int result) REQUIRES(upload_list_lock);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy) REQUIRES(upload_list_lock);
bool ParallelMultipartUpload(const char* path, const mp_part_list_t& mplist, bool is_copy);
bool InsertUploadPart(off_t start, off_t size, int part_num, bool is_copy, etagpair** ppetag) REQUIRES(upload_list_lock);
bool CancelAllThreads();
bool ExtractUploadPartsFromUntreatedArea(const off_t& untreated_start, const off_t& untreated_size, mp_part_list_t& to_upload_list, filepart_list_t& cancel_upload_list, off_t max_mp_size);
bool IsUploadingHasLock() const REQUIRES(upload_list_lock);

public:
explicit PseudoFdInfo(int fd = -1, int open_flags = 0);
Expand All @@ -104,7 +105,7 @@ class PseudoFdInfo
bool ClearUploadInfo(bool is_cancel_mp = false);
bool InitialUploadInfo(const std::string& id){ return RowInitialUploadInfo(id, true); }

bool IsUploading() const { return !upload_id.empty(); }
bool IsUploading() const;
bool GetUploadId(std::string& id) const;
bool GetEtaglist(etaglist_t& list) const;

Expand Down

0 comments on commit d8df82a

Please sign in to comment.