Skip to content

Commit

Permalink
[Improvement s3fs-fuse#2490] Add GUARDED_BY to FdEntity and fix locking
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Jul 14, 2024
1 parent 44d5b5e commit ea9589d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 66 deletions.
45 changes: 30 additions & 15 deletions src/fdcache_entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,11 @@ bool FdEntity::IsUploading()
//
int FdEntity::Open(const headers_t* pmeta, off_t size, const struct timespec& ts_mctime, int flags)
{
S3FS_PRN_DBG("[path=%s][physical_fd=%d][size=%lld][ts_mctime=%s][flags=0x%x]", path.c_str(), physical_fd, static_cast<long long>(size), str(ts_mctime).c_str(), flags);

const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

S3FS_PRN_DBG("[path=%s][physical_fd=%d][size=%lld][ts_mctime=%s][flags=0x%x]", path.c_str(), physical_fd, static_cast<long long>(size), str(ts_mctime).c_str(), flags);

// [NOTE]
// When the file size is incremental by truncating, it must be keeped
// as an untreated area, and this area is set to these variables.
Expand Down Expand Up @@ -690,6 +690,9 @@ bool FdEntity::LoadAll(int fd, headers_t* pmeta, off_t* size, bool force_load)
//
bool FdEntity::RenamePath(const std::string& newpath, std::string& fentmapkey)
{
const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

if(!cachepath.empty()){
// has cache path

Expand Down Expand Up @@ -837,6 +840,7 @@ bool FdEntity::UpdateAtime()
bool FdEntity::UpdateMtime(bool clear_holding_mtime)
{
const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

if(0 <= holding_mtime.tv_sec){
// [NOTE]
Expand Down Expand Up @@ -872,9 +876,10 @@ bool FdEntity::UpdateMtime(bool clear_holding_mtime)

bool FdEntity::SetHoldingMtime(struct timespec mtime)
{
S3FS_PRN_INFO3("[path=%s][physical_fd=%d][mtime=%s]", path.c_str(), physical_fd, str(mtime).c_str());

const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

S3FS_PRN_INFO3("[path=%s][physical_fd=%d][mtime=%s]", path.c_str(), physical_fd, str(mtime).c_str());

if(mtime.tv_sec < 0){
return false;
Expand Down Expand Up @@ -1346,7 +1351,7 @@ off_t FdEntity::BytesModified()
// Files smaller than the minimum part size will not be multipart uploaded,
// but will be uploaded as single part(normally).
//
int FdEntity::RowFlushHasLock(int fd, const char* tpath, bool force_sync)
int FdEntity::RowFlushWithoutLock(int fd, const char* tpath, bool force_sync)
{
S3FS_PRN_INFO3("[tpath=%s][path=%s][pseudo_fd=%d][physical_fd=%d]", SAFESTRPTR(tpath), path.c_str(), fd, physical_fd);

Expand All @@ -1365,8 +1370,6 @@ int FdEntity::RowFlushHasLock(int fd, const char* tpath, bool force_sync)
}
PseudoFdInfo* pseudo_obj = miter->second.get();

const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

int result;
if(!force_sync && !pagelist.IsModified() && !IsDirtyMetadata()){
// nothing to update.
Expand Down Expand Up @@ -1940,10 +1943,10 @@ bool FdEntity::ReserveDiskSpace(off_t size)

ssize_t FdEntity::Read(int fd, char* bytes, off_t start, size_t size, bool force_load)
{
S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), fd, physical_fd, static_cast<long long int>(start), size);

const std::lock_guard<std::mutex> lock(fdent_lock);

S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), fd, physical_fd, static_cast<long long int>(start), size);

if(-1 == physical_fd || nullptr == CheckPseudoFdFlags(fd, false)){
S3FS_PRN_DBG("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not readable", fd, physical_fd, path.c_str());
return -EBADF;
Expand Down Expand Up @@ -2004,9 +2007,10 @@ ssize_t FdEntity::Read(int fd, char* bytes, off_t start, size_t size, bool force

ssize_t FdEntity::Write(int fd, const char* bytes, off_t start, size_t size)
{
const std::lock_guard<std::mutex> lock(fdent_lock);

S3FS_PRN_DBG("[path=%s][pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), fd, physical_fd, static_cast<long long int>(start), size);

const std::lock_guard<std::mutex> lock(fdent_lock);
PseudoFdInfo* pseudo_obj = nullptr;
if(-1 == physical_fd || nullptr == (pseudo_obj = CheckPseudoFdFlags(fd, false))){
S3FS_PRN_ERR("pseudo_fd(%d) to physical_fd(%d) for path(%s) is not opened or not writable", fd, physical_fd, path.c_str());
Expand Down Expand Up @@ -2348,6 +2352,7 @@ ssize_t FdEntity::WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes,
bool FdEntity::MergeOrgMeta(headers_t& updatemeta)
{
const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

merge_headers(orgmeta, updatemeta, true); // overwrite all keys
// [NOTE]
Expand All @@ -2373,7 +2378,6 @@ bool FdEntity::MergeOrgMeta(headers_t& updatemeta)
SetAtimeHasLock(atime);
}

const std::lock_guard<std::mutex> data_lock(fdent_data_lock);
if(pending_status_t::NO_UPDATE_PENDING == pending_status && (IsUploading() || pagelist.IsModified())){
pending_status = pending_status_t::UPDATE_META_PENDING;
}
Expand Down Expand Up @@ -2410,7 +2414,7 @@ int FdEntity::UploadPendingHasLock(int fd)
S3FS_PRN_ERR("could not create a new file(%s), because fd is not specified.", path.c_str());
result = -EBADF;
}else{
result = FlushHasLock(fd, true);
result = RowFlushWithoutLock(fd, nullptr, true);
if(0 != result){
S3FS_PRN_ERR("failed to flush for file(%s) by(%d).", path.c_str(), result);
}else{
Expand Down Expand Up @@ -2456,11 +2460,11 @@ static int fallocate(int /*fd*/, int /*mode*/, off_t /*offset*/, off_t /*len*/)
//
bool FdEntity::PunchHole(off_t start, size_t size)
{
S3FS_PRN_DBG("[path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);

const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> data_lock(fdent_data_lock);

S3FS_PRN_DBG("[path=%s][physical_fd=%d][offset=%lld][size=%zu]", path.c_str(), physical_fd, static_cast<long long int>(start), size);

if(-1 == physical_fd){
return false;
}
Expand Down Expand Up @@ -2510,7 +2514,7 @@ void FdEntity::MarkDirtyNewFile()

bool FdEntity::IsDirtyNewFile() const
{
const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> lock(fdent_data_lock);

return (pending_status_t::CREATE_FILE_PENDING == pending_status);
}
Expand All @@ -2535,6 +2539,7 @@ bool FdEntity::IsDirtyMetadata() const
// [NOTE]
// fdent_lock must be previously locked.
//
// TODO: is the above true?
return (pending_status_t::UPDATE_META_PENDING == pending_status);
}

Expand All @@ -2550,6 +2555,11 @@ bool FdEntity::AddUntreated(off_t start, off_t size)
return result;
}

// [NOTE]
// An object that has already been locked with fdent_lock is passed to
// UploadBoundaryLastUntreatedArea(), which calls this method.
// Therefore, there is no need to lock fdent_lock in this method.
//
bool FdEntity::GetLastUpdateUntreatedPart(off_t& start, off_t& size) const
{
// Get last untreated area
Expand All @@ -2559,6 +2569,11 @@ bool FdEntity::GetLastUpdateUntreatedPart(off_t& start, off_t& size) const
return true;
}

// [NOTE]
// An object that has already been locked with fdent_lock is passed to
// UploadBoundaryLastUntreatedArea(), which calls this method.
// Therefore, there is no need to lock fdent_lock in this method.
//
bool FdEntity::ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size)
{
if(0 < front_size){
Expand Down
98 changes: 55 additions & 43 deletions src/fdcache_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,52 +53,52 @@ class FdEntity
static bool streamupload; // whether stream uploading.

mutable std::mutex fdent_lock;
std::string path; // object path
int physical_fd; // physical file(cache or temporary file) descriptor
UntreatedParts untreated_list; // list of untreated parts that have been written and not yet uploaded(for streamupload)
fdinfo_map_t pseudo_fd_map; // pseudo file descriptor information map
FILE* pfile; // file pointer(tmp file or cache file)
ino_t inode; // inode number for cache file
headers_t orgmeta; // original headers at opening
off_t size_orgmeta; // original file size in original headers
std::string path GUARDED_BY(fdent_lock); // object path
int physical_fd GUARDED_BY(fdent_lock); // physical file(cache or temporary file) descriptor
UntreatedParts untreated_list GUARDED_BY(fdent_lock); // list of untreated parts that have been written and not yet uploaded(for streamupload)
fdinfo_map_t pseudo_fd_map GUARDED_BY(fdent_lock); // pseudo file descriptor information map
FILE* pfile GUARDED_BY(fdent_lock); // file pointer(tmp file or cache file)
ino_t inode GUARDED_BY(fdent_lock); // inode number for cache file
headers_t orgmeta GUARDED_BY(fdent_lock); // original headers at opening
off_t size_orgmeta GUARDED_BY(fdent_lock); // original file size in original headers

mutable std::mutex fdent_data_lock;// protects the following members
PageList pagelist;
std::string cachepath; // local cache file path
// (if this is empty, does not load/save pagelist.)
std::string mirrorpath; // mirror file path to local cache file path
pending_status_t pending_status;// status for new file creation and meta update
struct timespec holding_mtime; // if mtime is updated while the file is open, it is set time_t value
PageList pagelist GUARDED_BY(fdent_data_lock);
std::string cachepath GUARDED_BY(fdent_data_lock); // local cache file path
// (if this is empty, does not load/save pagelist.)
std::string mirrorpath GUARDED_BY(fdent_data_lock); // mirror file path to local cache file path
pending_status_t pending_status GUARDED_BY(fdent_data_lock); // status for new file creation and meta update
struct timespec holding_mtime GUARDED_BY(fdent_data_lock); // if mtime is updated while the file is open, it is set time_t value

private:
static int FillFile(int fd, unsigned char byte, off_t size, off_t start);
static ino_t GetInode(int fd);

void Clear();
ino_t GetInode() const;
int OpenMirrorFile();
int NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start = 0, off_t size = 0); // size=0 means loading to end
ino_t GetInode() const REQUIRES(fdent_data_lock);
int OpenMirrorFile() REQUIRES(fdent_data_lock);
int NoCacheLoadAndPost(PseudoFdInfo* pseudo_obj, off_t start = 0, off_t size = 0) REQUIRES(fdent_lock, fdent_data_lock); // size=0 means loading to end
PseudoFdInfo* CheckPseudoFdFlags(int fd, bool writable) REQUIRES(FdEntity::fdent_lock);
bool IsUploading() REQUIRES(FdEntity::fdent_lock);
bool SetAllStatus(bool is_loaded); // [NOTE] not locking
bool SetAllStatusUnloaded() { return SetAllStatus(false); }
int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj);
bool SetAllStatus(bool is_loaded) REQUIRES(fdent_lock, fdent_data_lock); // [NOTE] not locking
bool SetAllStatusUnloaded() REQUIRES(fdent_lock, fdent_data_lock) { return SetAllStatus(false); }
int NoCachePreMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(fdent_lock, fdent_data_lock);
int NoCacheMultipartPost(PseudoFdInfo* pseudo_obj, int tgfd, off_t start, off_t size);
int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj);
int RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
int RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath);
ssize_t WriteNoMultipart(const PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size);
int NoCacheCompleteMultipartPost(PseudoFdInfo* pseudo_obj) REQUIRES(fdent_lock);
int RowFlushNoMultipart(const PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(fdent_lock, fdent_data_lock);
int RowFlushMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(fdent_lock, fdent_data_lock);
int RowFlushMixMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(fdent_lock, fdent_data_lock);
int RowFlushStreamMultipart(PseudoFdInfo* pseudo_obj, const char* tpath) REQUIRES(fdent_lock, fdent_data_lock);
ssize_t WriteNoMultipart(const PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size) REQUIRES(fdent_lock, fdent_data_lock);
ssize_t WriteMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size) REQUIRES(fdent_lock, fdent_data_lock);
ssize_t WriteMixMultipart(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size) REQUIRES(fdent_lock, fdent_data_lock);
ssize_t WriteStreamUpload(PseudoFdInfo* pseudo_obj, const char* bytes, off_t start, size_t size) REQUIRES(fdent_lock, fdent_data_lock);

bool ReserveDiskSpace(off_t size);
bool ReserveDiskSpace(off_t size) REQUIRES(fdent_lock, fdent_data_lock);

bool AddUntreated(off_t start, off_t size);
bool AddUntreated(off_t start, off_t size) REQUIRES(fdent_lock);

bool IsDirtyMetadata() const;
bool IsDirtyMetadata() const REQUIRES(fdent_data_lock);

public:
static bool GetNoMixMultipart() { return mixmultipart; }
Expand All @@ -114,8 +114,10 @@ class FdEntity
FdEntity& operator=(FdEntity&&) = delete;

void Close(int fd);
// TODO: should this require a lock?
bool IsOpen() const { return (-1 != physical_fd); }
bool IsOpen() const {
const std::lock_guard<std::mutex> lock(fdent_lock);
return (-1 != physical_fd);
}
bool FindPseudoFd(int fd) const {
const std::lock_guard<std::mutex> lock(fdent_lock);
return FindPseudoFdWithLock(fd);
Expand All @@ -135,9 +137,12 @@ class FdEntity
}
int GetOpenCountHasLock() const REQUIRES(FdEntity::fdent_lock);
// TODO: should this require a lock?
const std::string& GetPath() const { return path; }
std::string GetPath() const {
const std::lock_guard<std::mutex> lock(fdent_lock);
return path;
}
bool RenamePath(const std::string& newpath, std::string& fentmapkey);
int GetPhysicalFd() const { return physical_fd; }
int GetPhysicalFd() const REQUIRES(fdent_lock) { return physical_fd; }
bool IsModified() const;
bool MergeOrgMeta(headers_t& updatemeta);
int UploadPending(int fd) {
Expand All @@ -163,15 +168,16 @@ class FdEntity
int SetAtimeHasLock(struct timespec time) REQUIRES(FdEntity::fdent_lock);
int SetMCtime(struct timespec mtime, struct timespec ctime) {
const std::lock_guard<std::mutex> lock(fdent_lock);
const std::lock_guard<std::mutex> lock2(fdent_data_lock);
return SetMCtimeHasLock(mtime, ctime);
}
int SetMCtimeHasLock(struct timespec mtime, struct timespec ctime) REQUIRES(FdEntity::fdent_lock);
int SetMCtimeHasLock(struct timespec mtime, struct timespec ctime) REQUIRES(fdent_lock, fdent_data_lock);
bool UpdateCtime();
bool UpdateAtime();
bool UpdateMtime(bool clear_holding_mtime = false);
bool UpdateMCtime();
bool SetHoldingMtime(struct timespec mtime);
bool ClearHoldingMtime() REQUIRES(FdEntity::fdent_lock);
bool ClearHoldingMtime() REQUIRES(fdent_lock, fdent_data_lock);
bool GetSize(off_t& size) const;
bool GetXattr(std::string& xattr) const;
bool SetXattr(const std::string& xattr);
Expand Down Expand Up @@ -199,12 +205,15 @@ class FdEntity
const std::lock_guard<std::mutex> lock(fdent_lock);
return RowFlushHasLock(fd, tpath, force_sync);
}
int RowFlushHasLock(int fd, const char* tpath, bool force_sync = false) REQUIRES(FdEntity::fdent_lock);
int RowFlushHasLock(int fd, const char* tpath, bool force_sync = false) REQUIRES(FdEntity::fdent_lock) {
const std::lock_guard<std::mutex> lock(fdent_data_lock);
return RowFlushWithoutLock(fd, tpath, force_sync);
}
int RowFlushWithoutLock(int fd, const char* tpath, bool force_sync) REQUIRES(FdEntity::fdent_lock, FdEntity::fdent_data_lock);
int Flush(int fd, bool force_sync = false) {
const std::lock_guard<std::mutex> lock(fdent_lock);
return FlushHasLock(fd, force_sync);
return RowFlushHasLock(fd, nullptr, force_sync);
}
int FlushHasLock(int fd, bool force_sync = false) REQUIRES(FdEntity::fdent_lock) { return RowFlushHasLock(fd, nullptr, force_sync); }

ssize_t Read(int fd, char* bytes, off_t start, size_t size, bool force_load = false);
ssize_t Write(int fd, const char* bytes, off_t start, size_t size);
Expand All @@ -215,8 +224,11 @@ class FdEntity
bool IsDirtyNewFile() const;
void MarkDirtyMetadata();

bool GetLastUpdateUntreatedPart(off_t& start, off_t& size) const;
bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size);
// [NOTE]
// Callback from PseudoFdInfo::UploadBoundaryLastUntreatedArea()
//
bool GetLastUpdateUntreatedPart(off_t& start, off_t& size) const REQUIRES(FdEntity::fdent_lock);
bool ReplaceLastUpdateUntreatedPart(off_t front_start, off_t front_size, off_t behind_start, off_t behind_size) REQUIRES(FdEntity::fdent_lock);
};

typedef std::map<std::string, std::unique_ptr<FdEntity>> fdent_map_t; // key=path, value=unique_ptr<FdEntity>
Expand Down
16 changes: 8 additions & 8 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1534,16 +1534,16 @@ static int rename_object(const char* from, const char* to, bool update_ctime)
ent->SetAtime(atime);
}
}
}

// copy
if(0 != (result = put_headers(to, meta, true, /* use_st_size= */ false))){
return result;
}

// rename
FdManager::get()->Rename(from, to);
// copy
if(0 != (result = put_headers(to, meta, true, /* use_st_size= */ false))){
return result;
}

// rename
FdManager::get()->Rename(from, to);

// Remove file
result = s3fs_unlink(from);

Expand Down Expand Up @@ -1594,8 +1594,8 @@ static int rename_object_nocopy(const char* from, const char* to, bool update_ct
S3FS_PRN_ERR("could not upload file(%s): result=%d", to, result);
return result;
}
FdManager::get()->Rename(from, to);
}
FdManager::get()->Rename(from, to);

// Remove file
result = s3fs_unlink(from);
Expand Down

0 comments on commit ea9589d

Please sign in to comment.