Skip to content

Commit

Permalink
Fix metaserver deadlock caused by bthread coroutine switching
Browse files Browse the repository at this point in the history
Signed-off-by: Hanqing Wu <[email protected]>
  • Loading branch information
wu-hanqing authored and wuhongsong committed Nov 8, 2023
1 parent d996e02 commit 6639d26
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 46 deletions.
97 changes: 56 additions & 41 deletions curvefs/src/metaserver/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <algorithm>
#include <cstdint>
#include <future>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -537,54 +538,68 @@ MetaStatusCode Partition::GetAllBlockGroup(
}

void Partition::StartS3Compact() {
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
// register s3 compaction task in a separate thread, since the caller may
// holds a pthread wrlock when calling this function, and create `S3Compact`
// will acquire a bthread rwlock, may cause thread switching, thus causing a
// deadlock.
// FIXME(wuhanqing): handle it in a more elegant way
auto handle = std::async(std::launch::async, [this]() {
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
});

handle.wait();
}

void Partition::CancelS3Compact() {
S3CompactManager::GetInstance().Cancel(partitionInfo_.partitionid());
}

void Partition::StartVolumeDeallocate() {
FsInfo fsInfo;
bool ok =
FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), &fsInfo);
if (!ok) {
LOG(ERROR)
<< "Partition start volume deallocate fail, get fsinfo fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(partitionInfo_.fsid(),
partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
// FIXME(wuhanqing): same as `StartS3Compact`
auto handle = std::async(std::launch::async, [this]() {
FsInfo fsInfo;
bool ok = FsInfoManager::GetInstance().GetFsInfo(
partitionInfo_.fsid(), &fsInfo);
if (!ok) {
LOG(ERROR) << "Partition start volume deallocate fail, get fsinfo "
"fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(
partitionInfo_.fsid(), partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
});

handle.wait();
}

void Partition::CancelVolumeDeallocate() {
Expand Down
58 changes: 53 additions & 5 deletions src/common/concurrent/rw_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@
#ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_
#define SRC_COMMON_CONCURRENT_RW_LOCK_H_

#include <pthread.h>
#include <assert.h>
#include <glog/logging.h>
#include <bthread/bthread.h>
#include <glog/logging.h>
#include <pthread.h>
#include <sys/types.h> // gettid

#include "include/curve_compiler_specific.h"
#include "src/common/uncopyable.h"

// Due to the mixed use of bthread and pthread in some cases, acquiring another
// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock
// may result in switching the bthread coroutine, and then the operation of
// releasing the previous write lock in the other pthread will not take effect
// (implying that the write lock is still held), thus causing a deadlock.

// Check pthread rwlock tid between wrlock and unlock
#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \
(ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#else
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0
#endif

namespace curve {
namespace common {

Expand All @@ -51,10 +69,21 @@ class PthreadRWLockBase : public RWLockBase {
void WRLock() override {
int ret = pthread_rwlock_wrlock(&rwlock_);
CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret);
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
}

int TryWRLock() override {
return pthread_rwlock_trywrlock(&rwlock_);
int ret = pthread_rwlock_trywrlock(&rwlock_);
if (CURVE_UNLIKELY(ret != 0)) {
return ret;
}

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
return 0;
}

void RDLock() override {
Expand All @@ -67,6 +96,19 @@ class PthreadRWLockBase : public RWLockBase {
}

void Unlock() override {
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
if (tid_ != 0) {
const pid_t current = gettid();
// If CHECK here is triggered, please look at the comments at the
// beginning of the file.
// In the meantime, the simplest solution might be to use
// `BthreadRWLock` locks everywhere.
CHECK(tid_ == current)
<< ", tid has changed, previous tid: " << tid_
<< ", current tid: " << current;
tid_ = 0;
}
#endif
pthread_rwlock_unlock(&rwlock_);
}

Expand All @@ -76,8 +118,14 @@ class PthreadRWLockBase : public RWLockBase {

pthread_rwlock_t rwlock_;
pthread_rwlockattr_t rwlockAttr_;

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
pid_t tid_ = 0;
#endif
};

#undef CURVE_CHECK_PTHREAD_WRLOCK_TID

class RWLock : public PthreadRWLockBase {
public:
RWLock() {
Expand Down Expand Up @@ -122,7 +170,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryWRLock() override {
// not support yet
LOG(WARNING) << "TryWRLock not support yet";
return EINVAL;
}

Expand All @@ -132,7 +180,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryRDLock() override {
// not support yet
LOG(WARNING) << "TryRDLock not support yet";
return EINVAL;
}

Expand Down

0 comments on commit 6639d26

Please sign in to comment.