Skip to content

Commit

Permalink
Make psemaphore similar to C++20 std::counting_semaphore (s3fs-fuse#2569
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gaul authored Oct 28, 2024
1 parent 0788119 commit 3b226ed
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "common.h"
#include "fdcache_page.h"
#include "metaheader.h"
#include "psemaphore.h"
#include "s3fs_util.h"
#include "types.h"

Expand Down Expand Up @@ -86,7 +87,6 @@ typedef std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> CurlUniquePtr;
class CurlHandlerPool;
class S3fsCred;
class S3fsCurl;
class Semaphore;

// Prototype function for lazy setup options for curl handle
typedef bool (*s3fscurl_lazy_setup)(S3fsCurl* s3fscurl);
Expand Down
11 changes: 6 additions & 5 deletions src/curl_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,16 @@ int S3fsMultiCurl::MultiPerform()
std::map<std::thread::id, std::pair<std::thread, std::future<int>>> threads;
int result = 0;
bool isMultiHead = false;
Semaphore sem(GetMaxParallelism());
int semCount = GetMaxParallelism();
Semaphore sem(semCount);

for(auto iter = clist_req.cbegin(); iter != clist_req.cend(); ++iter) {
S3fsCurl* s3fscurl = iter->get();
if(!s3fscurl){
continue;
}

sem.wait();
sem.acquire();

{
const std::lock_guard<std::mutex> lock(completed_tids_lock);
Expand Down Expand Up @@ -155,8 +156,8 @@ int S3fsMultiCurl::MultiPerform()
threads.emplace(std::piecewise_construct, std::forward_as_tuple(thread_id), std::forward_as_tuple(std::move(thread), std::move(future)));
}

for(int i = 0; i < sem.get_value(); ++i){
sem.wait();
for(int i = 0; i < semCount; ++i){
sem.acquire();
}

const std::lock_guard<std::mutex> lock(completed_tids_lock);
Expand Down Expand Up @@ -355,7 +356,7 @@ void S3fsMultiCurl::RequestPerformWrapper(S3fsCurl* s3fscurl, std::promise<int>

const std::lock_guard<std::mutex> lock(*s3fscurl->completed_tids_lock);
s3fscurl->completed_tids->push_back(std::this_thread::get_id());
s3fscurl->sem->post();
s3fscurl->sem->release();

promise.set_value(result);
}
Expand Down
2 changes: 1 addition & 1 deletion src/fdcache_fdinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ int PseudoFdInfo::WaitAllThreadsExit()

while(is_loop){
// need to wait the worker exiting
uploaded_sem.wait();
uploaded_sem.acquire();
{
const std::lock_guard<std::mutex> lock(upload_list_lock);
if(0 < completed_count){
Expand Down
30 changes: 18 additions & 12 deletions src/psemaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
//-------------------------------------------------------------------
// Class Semaphore
//-------------------------------------------------------------------
#if __cplusplus >= 202002L

#include <semaphore>
typedef std::counting_semaphore<INT_MAX> Semaphore;

#else

// portability wrapper for sem_t since macOS does not implement it
#ifdef __APPLE__

Expand All @@ -36,8 +43,8 @@ class Semaphore
~Semaphore()
{
// macOS cannot destroy a semaphore with posts less than the initializer
for(int i = 0; i < get_value(); ++i){
post();
for(int i = 0; i < value; ++i){
release();
}
dispatch_release(sem);
}
Expand All @@ -46,17 +53,16 @@ class Semaphore
Semaphore& operator=(const Semaphore&) = delete;
Semaphore& operator=(Semaphore&&) = delete;

void wait() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
bool try_wait()
void acquire() { dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }
bool try_acquire()
{
if(0 == dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW)){
return true;
}else{
return false;
}
}
void post() { dispatch_semaphore_signal(sem); }
int get_value() const { return value; }
void release() { dispatch_semaphore_signal(sem); }

private:
int value;
Expand All @@ -71,16 +77,16 @@ class Semaphore
class Semaphore
{
public:
explicit Semaphore(int value) : value(value) { sem_init(&mutex, 0, value); }
explicit Semaphore(int value) { sem_init(&mutex, 0, value); }
~Semaphore() { sem_destroy(&mutex); }
void wait()
void acquire()
{
int r;
do {
r = sem_wait(&mutex);
} while (r == -1 && errno == EINTR);
}
bool try_wait()
bool try_acquire()
{
int result;
do{
Expand All @@ -89,16 +95,16 @@ class Semaphore

return (0 == result);
}
void post() { sem_post(&mutex); }
int get_value() const { return value; }
void release() { sem_post(&mutex); }

private:
int value;
sem_t mutex;
};

#endif

#endif

#endif // S3FS_SEMAPHORE_H_

/*
Expand Down
10 changes: 5 additions & 5 deletions src/sighandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ void S3fsSignals::CheckCacheWorker(Semaphore* pSem)
// wait and loop
while(S3fsSignals::enableUsr1){
// wait
pSem->wait();
pSem->acquire();

// cppcheck-suppress unmatchedSuppression
// cppcheck-suppress knownConditionTrueFalse
if(!S3fsSignals::enableUsr1){
break; // assap
break; // asap
}

// check all cache
Expand All @@ -114,7 +114,7 @@ void S3fsSignals::CheckCacheWorker(Semaphore* pSem)
}

// do not allow request queuing
while(pSem->try_wait());
while(pSem->try_acquire());
}
}

Expand Down Expand Up @@ -219,7 +219,7 @@ bool S3fsSignals::DestroyUsr1Handler()
S3fsSignals::enableUsr1 = false;

// wakeup thread
pSemUsr1->post();
pSemUsr1->release();

// wait for thread exiting
pThreadUsr1->join();
Expand All @@ -235,7 +235,7 @@ bool S3fsSignals::WakeupUsr1Thread()
S3FS_PRN_ERR("The thread for SIGUSR1 is not setup.");
return false;
}
pSemUsr1->post();
pSemUsr1->release();
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/sighandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <memory>
#include <thread>

class Semaphore;
#include "psemaphore.h"

//----------------------------------------------
// class S3fsSignals
Expand Down
10 changes: 5 additions & 5 deletions src/threadpoolman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise<int> promise)

while(!psingleton->IsExit()){
// wait
psingleton->thpoolman_sem.wait();
psingleton->thpoolman_sem.acquire();

if(psingleton->IsExit()){
break;
Expand All @@ -101,7 +101,7 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise<int> promise)
S3FS_PRN_WARN("The instruction function returned with something error code(%ld).", reinterpret_cast<long>(retval));
}
if(param.psem){
param.psem->post();
param.psem->release();
}
}

Expand Down Expand Up @@ -156,7 +156,7 @@ bool ThreadPoolMan::StopThreads()
// all threads to exit
SetExitFlag(true);
for(size_t waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){
thpoolman_sem.post();
thpoolman_sem.release();
}

// wait for threads exiting
Expand All @@ -168,7 +168,7 @@ bool ThreadPoolMan::StopThreads()
thread_list.clear();

// reset semaphore(to zero)
while(thpoolman_sem.try_wait()){
while(thpoolman_sem.try_acquire()){
}

return true;
Expand Down Expand Up @@ -212,7 +212,7 @@ void ThreadPoolMan::SetInstruction(const thpoolman_param& param)
}

// run thread
thpoolman_sem.post();
thpoolman_sem.release();
}

/*
Expand Down

0 comments on commit 3b226ed

Please sign in to comment.