From c78716c64124a39b0e8629ba79e17ee63c006e61 Mon Sep 17 00:00:00 2001 From: Takeshi Nakatani Date: Mon, 15 Jul 2024 06:40:05 +0000 Subject: [PATCH] Added new class for curl share handle Added new class for curl share handle. And, paired the curl handle(S3fsCurl) with the worker thread. Changed that each thread has its own SSL session cache to prevent data races. So OpenSSL suppression for ThreadSanitizer is no longer necessary, so reverted it. --- .github/workflows/ci.yml | 4 +- src/Makefile.am | 1 + src/curl.cpp | 122 +------------ src/curl.h | 9 - src/curl_share.cpp | 239 ++++++++++++++++++++++++++ src/curl_share.h | 89 ++++++++++ src/s3fs.cpp | 5 +- src/s3fs_threadreqs.cpp | 76 ++++---- src/s3fs_threadreqs.h | 28 +-- src/threadpoolman.cpp | 20 ++- src/threadpoolman.h | 4 +- test/run_tests_using_sanitizers.sh | 2 +- test/threadsanitizer_suppressions.txt | 1 - 13 files changed, 419 insertions(+), 181 deletions(-) create mode 100644 src/curl_share.cpp create mode 100644 src/curl_share.h delete mode 100644 test/threadsanitizer_suppressions.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c982759d0..86c7cedd6e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,7 +246,7 @@ jobs: elif [ "${{ matrix.checktype }}" = "sanitize_thread" ]; then echo 'CXX=clang++' echo "CXXFLAGS=${COMMON_CXXFLAGS} -O0 -fsanitize=thread" - echo 'TSAN_OPTIONS=halt_on_error=1,suppressions=threadsanitizer_suppressions.txt' + echo 'TSAN_OPTIONS=halt_on_error=1' # [NOTE] # Set this to avoid following error when running configure. # "FATAL: ThreadSanitizer: unexpected memory mapping" @@ -269,7 +269,7 @@ jobs: - name: Build run: | ./autogen.sh - /bin/sh -c "CXX=${CXX} CXXFLAGS=\"${CXXFLAGS}\" LDFLAGS=\"${LDFLAGS}\" TSAN_OPTIONS=\"\" ./configure --prefix=/usr --with-openssl" + /bin/sh -c "CXX=${CXX} CXXFLAGS=\"${CXXFLAGS}\" LDFLAGS=\"${LDFLAGS}\" ./configure --prefix=/usr --with-openssl" make - name: Test suite diff --git a/src/Makefile.am b/src/Makefile.am index fc2d56a20b..94d838ae4d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -36,6 +36,7 @@ s3fs_SOURCES = \ metaheader.cpp \ mpu_util.cpp \ curl.cpp \ + curl_share.cpp \ curl_util.cpp \ s3objlist.cpp \ cache.cpp \ diff --git a/src/curl.cpp b/src/curl.cpp index f360d11073..0f57acf0a7 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -35,6 +35,7 @@ #include "s3fs.h" #include "s3fs_logger.h" #include "curl.h" +#include "curl_share.h" #include "curl_util.h" #include "s3fs_auth.h" #include "s3fs_cred.h" @@ -98,10 +99,7 @@ constexpr char S3fsCurl::S3FS_SSL_PRIVKEY_PASSWORD[]; std::mutex S3fsCurl::curl_handles_lock; S3fsCurl::callback_locks_t S3fsCurl::callback_locks; bool S3fsCurl::is_initglobal_done = false; -CURLSH* S3fsCurl::hCurlShare = nullptr; bool S3fsCurl::is_cert_check = true; // default -bool S3fsCurl::is_dns_cache = true; // default -bool S3fsCurl::is_ssl_session_cache= false;// default(This turns OFF now, but turns ON again last PR) long S3fsCurl::connect_timeout = 300; // default time_t S3fsCurl::readwrite_timeout = 120; // default int S3fsCurl::retries = 5; // default @@ -153,9 +151,6 @@ bool S3fsCurl::InitS3fsCurl() if(!S3fsCurl::InitGlobalCurl()){ return false; } - if(!S3fsCurl::InitShareCurl()){ - return false; - } if(!S3fsCurl::InitCryptMutex()){ return false; } @@ -169,9 +164,6 @@ bool S3fsCurl::DestroyS3fsCurl() if(!S3fsCurl::DestroyCryptMutex()){ result = false; } - if(!S3fsCurl::DestroyShareCurl()){ - result = false; - } if(!S3fsCurl::DestroyGlobalCurl()){ result = false; } @@ -201,97 +193,6 @@ bool S3fsCurl::DestroyGlobalCurl() return true; } -bool S3fsCurl::InitShareCurl() -{ - CURLSHcode nSHCode; - - if(!S3fsCurl::is_dns_cache && !S3fsCurl::is_ssl_session_cache){ - S3FS_PRN_INFO("Curl does not share DNS data."); - return true; - } - if(S3fsCurl::hCurlShare){ - S3FS_PRN_WARN("already initiated."); - return false; - } - if(nullptr == (S3fsCurl::hCurlShare = curl_share_init())){ - S3FS_PRN_ERR("curl_share_init failed"); - return false; - } - if(CURLSHE_OK != (nSHCode = curl_share_setopt(S3fsCurl::hCurlShare, CURLSHOPT_LOCKFUNC, S3fsCurl::LockCurlShare))){ - S3FS_PRN_ERR("curl_share_setopt(LOCKFUNC) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); - return false; - } - if(CURLSHE_OK != (nSHCode = curl_share_setopt(S3fsCurl::hCurlShare, CURLSHOPT_UNLOCKFUNC, S3fsCurl::UnlockCurlShare))){ - S3FS_PRN_ERR("curl_share_setopt(UNLOCKFUNC) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); - return false; - } - if(S3fsCurl::is_dns_cache){ - nSHCode = curl_share_setopt(S3fsCurl::hCurlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); - if(CURLSHE_OK != nSHCode && CURLSHE_BAD_OPTION != nSHCode && CURLSHE_NOT_BUILT_IN != nSHCode){ - S3FS_PRN_ERR("curl_share_setopt(DNS) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); - return false; - }else if(CURLSHE_BAD_OPTION == nSHCode || CURLSHE_NOT_BUILT_IN == nSHCode){ - S3FS_PRN_WARN("curl_share_setopt(DNS) returns %d(%s), but continue without shared dns data.", nSHCode, curl_share_strerror(nSHCode)); - } - } - if(S3fsCurl::is_ssl_session_cache){ - nSHCode = curl_share_setopt(S3fsCurl::hCurlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); - if(CURLSHE_OK != nSHCode && CURLSHE_BAD_OPTION != nSHCode && CURLSHE_NOT_BUILT_IN != nSHCode){ - S3FS_PRN_ERR("curl_share_setopt(SSL SESSION) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); - return false; - }else if(CURLSHE_BAD_OPTION == nSHCode || CURLSHE_NOT_BUILT_IN == nSHCode){ - S3FS_PRN_WARN("curl_share_setopt(SSL SESSION) returns %d(%s), but continue without shared ssl session data.", nSHCode, curl_share_strerror(nSHCode)); - } - } - if(CURLSHE_OK != (nSHCode = curl_share_setopt(S3fsCurl::hCurlShare, CURLSHOPT_USERDATA, &S3fsCurl::callback_locks))){ - S3FS_PRN_ERR("curl_share_setopt(USERDATA) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); - return false; - } - return true; -} - -bool S3fsCurl::DestroyShareCurl() -{ - if(!S3fsCurl::hCurlShare){ - if(!S3fsCurl::is_dns_cache && !S3fsCurl::is_ssl_session_cache){ - return true; - } - S3FS_PRN_WARN("already destroy share curl."); - return false; - } - if(CURLSHE_OK != curl_share_cleanup(S3fsCurl::hCurlShare)){ - return false; - } - S3fsCurl::hCurlShare = nullptr; - return true; -} - -void S3fsCurl::LockCurlShare(CURL* handle, curl_lock_data nLockData, curl_lock_access laccess, void* useptr) -{ - if(!hCurlShare){ - return; - } - auto* locks = static_cast(useptr); - if(CURL_LOCK_DATA_DNS == nLockData){ - locks->dns.lock(); - }else if(CURL_LOCK_DATA_SSL_SESSION == nLockData){ - locks->ssl_session.lock(); - } -} - -void S3fsCurl::UnlockCurlShare(CURL* handle, curl_lock_data nLockData, void* useptr) -{ - if(!hCurlShare){ - return; - } - auto* locks = static_cast(useptr); - if(CURL_LOCK_DATA_DNS == nLockData){ - locks->dns.unlock(); - }else if(CURL_LOCK_DATA_SSL_SESSION == nLockData){ - locks->ssl_session.unlock(); - } -} - bool S3fsCurl::InitCryptMutex() { return s3fs_init_crypt_mutex(); @@ -668,26 +569,12 @@ bool S3fsCurl::SetCheckCertificate(bool isCertCheck) return old; } -bool S3fsCurl::SetDnsCache(bool isCache) -{ - bool old = S3fsCurl::is_dns_cache; - S3fsCurl::is_dns_cache = isCache; - return old; -} - void S3fsCurl::ResetOffset(S3fsCurl* pCurl) { pCurl->partdata.startpos = pCurl->b_partdata_startpos; pCurl->partdata.size = pCurl->b_partdata_size; } -bool S3fsCurl::SetSslSessionCache(bool isCache) -{ - bool old = S3fsCurl::is_ssl_session_cache; - S3fsCurl::is_ssl_session_cache = isCache; - return old; -} - long S3fsCurl::SetConnectTimeout(long timeout) { long old = S3fsCurl::connect_timeout; @@ -1585,11 +1472,10 @@ bool S3fsCurl::ResetHandle() } } - if((S3fsCurl::is_dns_cache || S3fsCurl::is_ssl_session_cache) && S3fsCurl::hCurlShare){ - if(CURLE_OK != curl_easy_setopt(hCurl, CURLOPT_SHARE, S3fsCurl::hCurlShare)){ - return false; - } + if(!S3fsCurlShare::SetCurlShareHandle(hCurl.get())){ + return false; } + if(!S3fsCurl::is_cert_check) { S3FS_PRN_DBG("'no_check_certificate' option in effect."); S3FS_PRN_DBG("The server certificate won't be checked against the available certificate authorities."); diff --git a/src/curl.h b/src/curl.h index f499d48cfb..2294ca9195 100644 --- a/src/curl.h +++ b/src/curl.h @@ -127,10 +127,7 @@ class S3fsCurl std::mutex ssl_session; } callback_locks; static bool is_initglobal_done; - static CURLSH* hCurlShare; static bool is_cert_check; - static bool is_dns_cache; - static bool is_ssl_session_cache; static long connect_timeout; static time_t readwrite_timeout; static int retries; @@ -211,10 +208,6 @@ class S3fsCurl // class methods static bool InitGlobalCurl(); static bool DestroyGlobalCurl(); - static bool InitShareCurl(); - static bool DestroyShareCurl(); - static void LockCurlShare(CURL* handle, curl_lock_data nLockData, curl_lock_access laccess, void* useptr) NO_THREAD_SAFETY_ANALYSIS; - static void UnlockCurlShare(CURL* handle, curl_lock_data nLockData, void* useptr) NO_THREAD_SAFETY_ANALYSIS; static bool InitCryptMutex(); static bool DestroyCryptMutex(); static int CurlProgress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow); @@ -274,8 +267,6 @@ class S3fsCurl // class methods(variables) static std::string LookupMimeType(const std::string& name); static bool SetCheckCertificate(bool isCertCheck); - static bool SetDnsCache(bool isCache); - static bool SetSslSessionCache(bool isCache); static long SetConnectTimeout(long timeout); static time_t SetReadwriteTimeout(time_t timeout); static time_t GetReadwriteTimeout() { return S3fsCurl::readwrite_timeout; } diff --git a/src/curl_share.cpp b/src/curl_share.cpp new file mode 100644 index 0000000000..a9229fb407 --- /dev/null +++ b/src/curl_share.cpp @@ -0,0 +1,239 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include "common.h" +#include "s3fs.h" +#include "s3fs_logger.h" +#include "curl_share.h" + +//------------------------------------------------------------------- +// Class S3fsCurlShare +//------------------------------------------------------------------- +bool S3fsCurlShare::is_dns_cache = true; // default +bool S3fsCurlShare::is_ssl_cache = true; // default +std::mutex S3fsCurlShare::curl_share_lock; +std::map S3fsCurlShare::ShareHandles; +std::map S3fsCurlShare::ShareLocks; + +//------------------------------------------------------------------- +// Class methods for S3fsCurlShare +//------------------------------------------------------------------- +bool S3fsCurlShare::SetDnsCache(bool isCache) +{ + bool old = S3fsCurlShare::is_dns_cache; + S3fsCurlShare::is_dns_cache = isCache; + return old; +} + +bool S3fsCurlShare::SetSslSessionCache(bool isCache) +{ + bool old = S3fsCurlShare::is_ssl_cache; + S3fsCurlShare::is_ssl_cache = isCache; + return old; +} + +void S3fsCurlShare::LockCurlShare(CURL* handle, curl_lock_data nLockData, curl_lock_access laccess, void* useptr) +{ + auto* pLocks = static_cast(useptr); + + if(CURL_LOCK_DATA_DNS == nLockData){ + pLocks->lock_dns.lock(); + }else if(CURL_LOCK_DATA_SSL_SESSION == nLockData){ + pLocks->lock_session.lock(); + } +} + +void S3fsCurlShare::UnlockCurlShare(CURL* handle, curl_lock_data nLockData, void* useptr) +{ + auto* pLocks = static_cast(useptr); + + if(CURL_LOCK_DATA_DNS == nLockData){ + pLocks->lock_dns.unlock(); + }else if(CURL_LOCK_DATA_SSL_SESSION == nLockData){ + pLocks->lock_session.unlock(); + } +} + +bool S3fsCurlShare::SetCurlShareHandle(CURL* hCurl) +{ + if(!hCurl){ + S3FS_PRN_ERR("Curl handle is null"); + return false; + } + + // get curl share handle + S3fsCurlShare CurlShareObj; + CURLSH* hCurlShare = CurlShareObj.GetCurlShareHandle(); + if(!hCurlShare){ + // a case of not to use CurlShare + return true; + } + + // set share handle to curl handle + if(CURLE_OK != curl_easy_setopt(hCurl, CURLOPT_SHARE, hCurlShare)){ + S3FS_PRN_ERR("Failed to set Curl share handle to curl handle."); + return false; + } + return true; +} + +bool S3fsCurlShare::DestroyCurlShareHandleForThread() +{ + S3fsCurlShare CurlShareObj; + return CurlShareObj.DestroyCurlShareHandle(); +} + +bool S3fsCurlShare::InitializeCurlShare(const CurlSharePtr& hShare, const ShareLocksPtr& ShareLock) +{ + CURLSHcode nSHCode; + + // set lock handlers + if(CURLSHE_OK != (nSHCode = curl_share_setopt(hShare.get(), CURLSHOPT_LOCKFUNC, S3fsCurlShare::LockCurlShare))){ + S3FS_PRN_ERR("curl_share_setopt(LOCKFUNC) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); + return false; + } + if(CURLSHE_OK != (nSHCode = curl_share_setopt(hShare.get(), CURLSHOPT_UNLOCKFUNC, S3fsCurlShare::UnlockCurlShare))){ + S3FS_PRN_ERR("curl_share_setopt(UNLOCKFUNC) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); + return false; + } + + // set user data for lock functions + if(CURLSHE_OK != (nSHCode = curl_share_setopt(hShare.get(), CURLSHOPT_USERDATA, ShareLock.get()))){ + S3FS_PRN_ERR("curl_share_setopt(USERDATA) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); + return false; + } + + // set share type + if(S3fsCurlShare::is_dns_cache){ + nSHCode = curl_share_setopt(hShare.get(), CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); + if(CURLSHE_OK != nSHCode && CURLSHE_BAD_OPTION != nSHCode && CURLSHE_NOT_BUILT_IN != nSHCode){ + S3FS_PRN_ERR("curl_share_setopt(DNS) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); + return false; + }else if(CURLSHE_BAD_OPTION == nSHCode || CURLSHE_NOT_BUILT_IN == nSHCode){ + S3FS_PRN_WARN("curl_share_setopt(DNS) returns %d(%s), but continue without shared dns data.", nSHCode, curl_share_strerror(nSHCode)); + } + } + if(S3fsCurlShare::is_ssl_cache){ + nSHCode = curl_share_setopt(hShare.get(), CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); + if(CURLSHE_OK != nSHCode && CURLSHE_BAD_OPTION != nSHCode && CURLSHE_NOT_BUILT_IN != nSHCode){ + S3FS_PRN_ERR("curl_share_setopt(SSL SESSION) returns %d(%s)", nSHCode, curl_share_strerror(nSHCode)); + return false; + }else if(CURLSHE_BAD_OPTION == nSHCode || CURLSHE_NOT_BUILT_IN == nSHCode){ + S3FS_PRN_WARN("curl_share_setopt(SSL SESSION) returns %d(%s), but continue without shared ssl session data.", nSHCode, curl_share_strerror(nSHCode)); + } + } + + return true; +} + +//------------------------------------------------------------------- +// Methods for S3fsCurlShare +//------------------------------------------------------------------- +// [NOTE] +// set current thread id(std style) to ThreadId +// +S3fsCurlShare::S3fsCurlShare() : ThreadId(std::this_thread::get_id()) +{ +} + +bool S3fsCurlShare::DestroyCurlShareHandle() +{ + if(!S3fsCurlShare::is_dns_cache && !S3fsCurlShare::is_ssl_cache){ + // Any curl share handle does not exist + return true; + } + + const std::lock_guard lock(S3fsCurlShare::curl_share_lock); + + // find existed handle and cleanup it + auto handle_iter = S3fsCurlShare::ShareHandles.find(ThreadId); + if(handle_iter == S3fsCurlShare::ShareHandles.end()){ + S3FS_PRN_WARN("Not found curl share handle"); + }else{ + if(CURLSHE_OK != curl_share_cleanup(handle_iter->second.get())){ + S3FS_PRN_ERR("Failed to cleanup curl share handle"); + return false; + } + S3fsCurlShare::ShareHandles.erase(handle_iter); + } + + // find lock and cleanup it + auto locks_iter = S3fsCurlShare::ShareLocks.find(ThreadId); + if(locks_iter == S3fsCurlShare::ShareLocks.end()){ + S3FS_PRN_WARN("Not found locks of curl share handle"); + }else{ + S3fsCurlShare::ShareLocks.erase(locks_iter); + } + + return true; +} + +CURLSH* S3fsCurlShare::GetCurlShareHandle() +{ + if(!S3fsCurlShare::is_dns_cache && !S3fsCurlShare::is_ssl_cache){ + // Any curl share handle does not exist + return nullptr; + } + + const std::lock_guard lock(S3fsCurlShare::curl_share_lock); + + // find existed handle + auto handle_iter = S3fsCurlShare::ShareHandles.find(ThreadId); + if(handle_iter != S3fsCurlShare::ShareHandles.end()){ + // Already created share handle for this thread. + return handle_iter->second.get(); + } + + // create new curl share handle and locks + CurlSharePtr hShare = {nullptr, curl_share_cleanup}; + hShare.reset(curl_share_init()); + if(!hShare){ + S3FS_PRN_ERR("Failed to create curl share handle"); + return nullptr; + } + ShareLocksPtr pLocks(new curl_share_locks); + + // Initialize curl share handle + if(!S3fsCurlShare::InitializeCurlShare(hShare, pLocks)){ + S3FS_PRN_ERR("Failed to initialize curl share handle"); + return nullptr; + } + + // set map + S3fsCurlShare::ShareHandles.emplace(ThreadId, std::move(hShare)); + S3fsCurlShare::ShareLocks.emplace(ThreadId, std::move(pLocks)); + + // For clang-tidy measures + handle_iter = S3fsCurlShare::ShareHandles.find(ThreadId); + if(handle_iter == S3fsCurlShare::ShareHandles.end()){ + S3FS_PRN_ERR("Failed to insert curl share to map."); + return nullptr; + } + return handle_iter->second.get(); +} + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/curl_share.h b/src/curl_share.h new file mode 100644 index 0000000000..e90a07a743 --- /dev/null +++ b/src/curl_share.h @@ -0,0 +1,89 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef S3FS_CURL_SHARE_H_ +#define S3FS_CURL_SHARE_H_ + +#include +#include +#include +#include +#include + +#include "common.h" + +//---------------------------------------------- +// Structure / Typedefs +//---------------------------------------------- +struct curl_share_locks { + std::mutex lock_dns; + std::mutex lock_session; +}; + +typedef std::unique_ptr CurlSharePtr; +typedef std::unique_ptr ShareLocksPtr; + +//---------------------------------------------- +// class S3fsCurlShare +//---------------------------------------------- +class S3fsCurlShare +{ + private: + static bool is_dns_cache; + static bool is_ssl_cache; + static std::mutex curl_share_lock; + static std::map ShareHandles GUARDED_BY(curl_share_lock); + static std::map ShareLocks GUARDED_BY(curl_share_lock); + + std::thread::id ThreadId; + + private: + static void LockCurlShare(CURL* handle, curl_lock_data nLockData, curl_lock_access laccess, void* useptr) NO_THREAD_SAFETY_ANALYSIS; + static void UnlockCurlShare(CURL* handle, curl_lock_data nLockData, void* useptr) NO_THREAD_SAFETY_ANALYSIS; + static bool InitializeCurlShare(const CurlSharePtr& hShare, const ShareLocksPtr& ShareLock) REQUIRES(curl_share_lock); + + bool DestroyCurlShareHandle(); + CURLSH* GetCurlShareHandle(); + + public: + static bool SetDnsCache(bool isCache); + static bool SetSslSessionCache(bool isCache); + static bool SetCurlShareHandle(CURL* hCurl); + static bool DestroyCurlShareHandleForThread(); + + // constructor/destructor + explicit S3fsCurlShare(); + ~S3fsCurlShare() = default; + S3fsCurlShare(const S3fsCurlShare&) = delete; + S3fsCurlShare(S3fsCurlShare&&) = delete; + S3fsCurlShare& operator=(const S3fsCurlShare&) = delete; + S3fsCurlShare& operator=(S3fsCurlShare&&) = delete; +}; + +#endif // S3FS_CURL_SHARE_H_ + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/s3fs.cpp b/src/s3fs.cpp index ee26c9452f..a2ccd8a2a4 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -43,6 +43,7 @@ #include "fdcache_auto.h" #include "fdcache_stat.h" #include "curl.h" +#include "curl_share.h" #include "curl_util.h" #include "s3objlist.h" #include "cache.h" @@ -4935,11 +4936,11 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar return 0; } else if(0 == strcmp(arg, "nodnscache")){ - S3fsCurl::SetDnsCache(false); + S3fsCurlShare::SetDnsCache(false); return 0; } else if(0 == strcmp(arg, "nosscache")){ - S3fsCurl::SetSslSessionCache(false); + S3fsCurlShare::SetSslSessionCache(false); return 0; } else if(is_prefix(arg, "parallel_count=") || is_prefix(arg, "parallel_upload=")){ diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index 6dff902af8..251330ae1d 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -36,7 +36,7 @@ // // Thread Worker function for head request // -void* head_req_threadworker(void* arg) +void* head_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !pthparam->pmeta){ @@ -44,7 +44,8 @@ void* head_req_threadworker(void* arg) } S3FS_PRN_INFO3("Head Request [path=%s][pmeta=%p]", pthparam->path.c_str(), pthparam->pmeta); - S3fsCurl s3fscurl; + s3fscurl.SetUseAhbe(false); + pthparam->result = s3fscurl.HeadRequest(pthparam->path.c_str(), *(pthparam->pmeta)); return reinterpret_cast(pthparam->result); @@ -53,7 +54,7 @@ void* head_req_threadworker(void* arg) // // Thread Worker function for multi head request // -void* multi_head_req_threadworker(void* arg) +void* multi_head_req_threadworker(S3fsCurl& s3fscurl, void* arg) { std::unique_ptr pthparam(static_cast(arg)); if(!pthparam || !pthparam->psyncfiller || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->pnotfound_list || !pthparam->presult){ @@ -72,8 +73,9 @@ void* multi_head_req_threadworker(void* arg) } } + s3fscurl.SetUseAhbe(false); + // loop for head request - S3fsCurl s3fscurl; int result = 0; headers_t meta; // this value is not used while(true){ @@ -197,7 +199,7 @@ void* multi_head_req_threadworker(void* arg) // // Thread Worker function for delete request // -void* delete_req_threadworker(void* arg) +void* delete_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -205,7 +207,8 @@ void* delete_req_threadworker(void* arg) } S3FS_PRN_INFO3("Delete Request [path=%s]", pthparam->path.c_str()); - S3fsCurl s3fscurl; + s3fscurl.SetUseAhbe(false); + pthparam->result = s3fscurl.DeleteRequest(pthparam->path.c_str()); return reinterpret_cast(pthparam->result); @@ -214,7 +217,7 @@ void* delete_req_threadworker(void* arg) // // Thread Worker function for put head request // -void* put_head_req_threadworker(void* arg) +void* put_head_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -222,7 +225,8 @@ void* put_head_req_threadworker(void* arg) } S3FS_PRN_INFO3("Put Head Request [path=%s][meta count=%lu][is copy=%s]", pthparam->path.c_str(), pthparam->meta.size(), (pthparam->isCopy ? "true" : "false")); - S3fsCurl s3fscurl(true); + s3fscurl.SetUseAhbe(true); + pthparam->result = s3fscurl.PutHeadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->isCopy); return reinterpret_cast(pthparam->result); @@ -231,7 +235,7 @@ void* put_head_req_threadworker(void* arg) // // Thread Worker function for put request // -void* put_req_threadworker(void* arg) +void* put_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -239,7 +243,8 @@ void* put_req_threadworker(void* arg) } S3FS_PRN_INFO3("Put Request [path=%s][meta count=%lu][fd=%d][use_ahbe=%s]", pthparam->path.c_str(), pthparam->meta.size(), pthparam->fd, (pthparam->ahbe ? "true" : "false")); - S3fsCurl s3fscurl(pthparam->ahbe); + s3fscurl.SetUseAhbe(pthparam->ahbe); + pthparam->result = s3fscurl.PutRequest(pthparam->path.c_str(), pthparam->meta, pthparam->fd); return reinterpret_cast(pthparam->result); @@ -248,7 +253,7 @@ void* put_req_threadworker(void* arg) // // Thread Worker function for list bucket request // -void* list_bucket_req_threadworker(void* arg) +void* list_bucket_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !(pthparam->presponseBody)){ @@ -256,7 +261,8 @@ void* list_bucket_req_threadworker(void* arg) } S3FS_PRN_INFO3("List Bucket Request [path=%s][query=%s]", pthparam->path.c_str(), pthparam->query.c_str()); - S3fsCurl s3fscurl; + s3fscurl.SetUseAhbe(false); + if(0 == (pthparam->result = s3fscurl.ListBucketRequest(pthparam->path.c_str(), pthparam->query.c_str()))){ *(pthparam->presponseBody) = s3fscurl.GetBodyData(); } @@ -266,7 +272,7 @@ void* list_bucket_req_threadworker(void* arg) // // Thread Worker function for check service request // -void* check_service_req_threadworker(void* arg) +void* check_service_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !(pthparam->presponseCode) || !(pthparam->presponseBody)){ @@ -274,7 +280,8 @@ void* check_service_req_threadworker(void* arg) } S3FS_PRN_INFO3("Check Service Request [path=%s][support compat dir=%s][force No SSE=%s]", pthparam->path.c_str(), (pthparam->support_compat_dir ? "true" : "false"), (pthparam->forceNoSSE ? "true" : "false")); - S3fsCurl s3fscurl; + s3fscurl.SetUseAhbe(false); + if(0 == (pthparam->result = s3fscurl.CheckBucket(pthparam->path.c_str(), pthparam->support_compat_dir, pthparam->forceNoSSE))){ *(pthparam->presponseCode) = s3fscurl.GetLastResponseCode(); *(pthparam->presponseBody) = s3fscurl.GetBodyData(); @@ -285,7 +292,7 @@ void* check_service_req_threadworker(void* arg) // // Worker function for pre multipart upload request // -void* pre_multipart_upload_req_threadworker(void* arg) +void* pre_multipart_upload_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -293,7 +300,8 @@ void* pre_multipart_upload_req_threadworker(void* arg) } S3FS_PRN_INFO3("Pre Multipart Upload Request [path=%s][meta count=%lu]", pthparam->path.c_str(), pthparam->meta.size()); - S3fsCurl s3fscurl(true); + s3fscurl.SetUseAhbe(true); + pthparam->result = s3fscurl.PreMultipartUploadRequest(pthparam->path.c_str(), pthparam->meta, pthparam->upload_id); return reinterpret_cast(pthparam->result); @@ -302,7 +310,7 @@ void* pre_multipart_upload_req_threadworker(void* arg) // // Worker function for pre multipart upload part request // -void* multipart_upload_part_req_threadworker(void* arg) +void* multipart_upload_part_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !pthparam->pthparam_lock || !pthparam->petag || !pthparam->presult){ @@ -321,11 +329,12 @@ void* multipart_upload_part_req_threadworker(void* arg) } } + s3fscurl.SetUseAhbe(true); + // // Request // - S3fsCurl s3fscurl(true); - int result; + int result; if(0 != (result = s3fscurl.MultipartUploadPartRequest(pthparam->path.c_str(), pthparam->upload_fd, pthparam->start, pthparam->size, pthparam->part_num, pthparam->upload_id, pthparam->petag, pthparam->is_copy))){ S3FS_PRN_ERR("Failed Multipart Upload Part Worker with error(%d) [path=%s][upload_id=%s][upload_fd=%d][start=%lld][size=%lld][is_copy=%s][part_num=%d]", result, pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->upload_fd, static_cast(pthparam->start), static_cast(pthparam->size), (pthparam->is_copy ? "true" : "false"), pthparam->part_num); } @@ -342,7 +351,7 @@ void* multipart_upload_part_req_threadworker(void* arg) // // Worker function for complete multipart upload request // -void* complete_multipart_upload_threadworker(void* arg) +void* complete_multipart_upload_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -350,7 +359,8 @@ void* complete_multipart_upload_threadworker(void* arg) } S3FS_PRN_INFO3("Complete Multipart Upload Request [path=%s][upload id=%s][etaglist=%lu]", pthparam->path.c_str(), pthparam->upload_id.c_str(), pthparam->etaglist.size()); - S3fsCurl s3fscurl(true); + s3fscurl.SetUseAhbe(true); + pthparam->result = s3fscurl.MultipartUploadComplete(pthparam->path.c_str(), pthparam->upload_id, pthparam->etaglist); return reinterpret_cast(pthparam->result); @@ -359,7 +369,7 @@ void* complete_multipart_upload_threadworker(void* arg) // // Worker function for abort multipart upload request // -void* abort_multipart_upload_req_threadworker(void* arg) +void* abort_multipart_upload_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -367,7 +377,8 @@ void* abort_multipart_upload_req_threadworker(void* arg) } S3FS_PRN_INFO3("Abort Multipart Upload Request [path=%s][upload id=%s]", pthparam->path.c_str(), pthparam->upload_id.c_str()); - S3fsCurl s3fscurl(true); + s3fscurl.SetUseAhbe(true); + pthparam->result = s3fscurl.AbortMultipartUpload(pthparam->path.c_str(), pthparam->upload_id); return reinterpret_cast(pthparam->result); @@ -376,7 +387,7 @@ void* abort_multipart_upload_req_threadworker(void* arg) // // Thread Worker function for get object request // -void* get_object_req_threadworker(void* arg) +void* get_object_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam){ @@ -390,7 +401,8 @@ void* get_object_req_threadworker(void* arg) S3FS_PRN_WARN("Failed to get SSE type for file(%s).", pthparam->path.c_str()); } - S3fsCurl s3fscurl; + s3fscurl.SetUseAhbe(false); + pthparam->result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, ssetype, ssevalue); return reinterpret_cast(pthparam->result); @@ -399,7 +411,7 @@ void* get_object_req_threadworker(void* arg) // // Thread Worker function for multipart put head request // -void* multipart_put_head_req_threadworker(void* arg) +void* multipart_put_head_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !pthparam->ppartdata || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){ @@ -418,8 +430,9 @@ void* multipart_put_head_req_threadworker(void* arg) } } - S3fsCurl s3fscurl(true); - int result = 0; + s3fscurl.SetUseAhbe(true); + + int result = 0; while(true){ // Request result = s3fscurl.MultipartPutHeadRequest(pthparam->from, pthparam->to, pthparam->part_number, pthparam->upload_id, pthparam->meta); @@ -516,7 +529,7 @@ void* multipart_put_head_req_threadworker(void* arg) // // Thread Worker function for parallel get object request // -void* parallel_get_object_req_threadworker(void* arg) +void* parallel_get_object_req_threadworker(S3fsCurl& s3fscurl, void* arg) { auto* pthparam = static_cast(arg); if(!pthparam || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->presult){ @@ -535,8 +548,9 @@ void* parallel_get_object_req_threadworker(void* arg) } } - S3fsCurl s3fscurl(true); - int result = 0; + s3fscurl.SetUseAhbe(true); + + int result = 0; while(true){ // Request result = s3fscurl.GetObjectRequest(pthparam->path.c_str(), pthparam->fd, pthparam->start, pthparam->size, pthparam->ssetype, pthparam->ssevalue); diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index 6432e2666e..da50b455f3 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -209,20 +209,20 @@ struct get_object_req_thparam //------------------------------------------------------------------- // Thread Worker functions for MultiThread Request //------------------------------------------------------------------- -void* head_req_threadworker(void* arg); -void* multi_head_req_threadworker(void* arg); -void* delete_req_threadworker(void* arg); -void* put_head_req_threadworker(void* arg); -void* put_req_threadworker(void* arg); -void* list_bucket_req_threadworker(void* arg); -void* check_service_req_threadworker(void* arg); -void* pre_multipart_upload_req_threadworker(void* arg); -void* multipart_upload_part_req_threadworker(void* arg); -void* complete_multipart_upload_threadworker(void* arg); -void* abort_multipart_upload_req_threadworker(void* arg); -void* multipart_put_head_req_threadworker(void* arg); -void* parallel_get_object_req_threadworker(void* arg); -void* get_object_req_threadworker(void* arg); +void* head_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* multi_head_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* delete_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* put_head_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* put_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* list_bucket_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* check_service_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* pre_multipart_upload_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* multipart_upload_part_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* complete_multipart_upload_threadworker(S3fsCurl& s3fscurl, void* arg); +void* abort_multipart_upload_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* multipart_put_head_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* parallel_get_object_req_threadworker(S3fsCurl& s3fscurl, void* arg); +void* get_object_req_threadworker(S3fsCurl& s3fscurl, void* arg); //------------------------------------------------------------------- // Utility functions diff --git a/src/threadpoolman.cpp b/src/threadpoolman.cpp index df0b2be383..6c36db5baa 100644 --- a/src/threadpoolman.cpp +++ b/src/threadpoolman.cpp @@ -28,6 +28,8 @@ #include "s3fs_logger.h" #include "threadpoolman.h" +#include "curl.h" +#include "curl_share.h" //------------------------------------------------ // ThreadPoolMan class variables @@ -105,6 +107,9 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise promise) } S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan."); + // The only object in this thread worker + S3fsCurl s3fscurl(true); + while(!psingleton->IsExit()){ // wait psingleton->thpoolman_sem.acquire(); @@ -113,6 +118,12 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise promise) break; } + // reset curl handle + if(!s3fscurl.CreateCurlHandle(true)){ + S3FS_PRN_ERR("Failed to re-create curl handle."); + break; + } + // get instruction thpoolman_param param; { @@ -127,8 +138,9 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise promise) } } - void* retval = param.pfunc(param.args); - if(nullptr != retval){ + // run function + void* retval; + if(nullptr != (retval = param.pfunc(s3fscurl, param.args))){ S3FS_PRN_WARN("The instruction function returned with something error code(%ld).", reinterpret_cast(retval)); } if(param.psem){ @@ -136,6 +148,10 @@ void ThreadPoolMan::Worker(ThreadPoolMan* psingleton, std::promise promise) } } + if(!S3fsCurlShare::DestroyCurlShareHandleForThread()){ + S3FS_PRN_WARN("Failed to destory curl share handle for this thread, but continue..."); + } + promise.set_value(0); } diff --git a/src/threadpoolman.h b/src/threadpoolman.h index 22e4d6e969..7704b5d603 100644 --- a/src/threadpoolman.h +++ b/src/threadpoolman.h @@ -33,10 +33,12 @@ //------------------------------------------------ // Typedefs for functions and structures //------------------------------------------------ +class S3fsCurl; + // // Prototype function // -typedef void* (*thpoolman_worker)(void*); +typedef void* (*thpoolman_worker)(S3fsCurl&, void*); // // Parameter structure diff --git a/test/run_tests_using_sanitizers.sh b/test/run_tests_using_sanitizers.sh index 46878caf37..97e322fbde 100755 --- a/test/run_tests_using_sanitizers.sh +++ b/test/run_tests_using_sanitizers.sh @@ -49,7 +49,7 @@ ALL_TESTS=1 ASAN_OPTIONS='detect_leaks=1,detect_stack_use_after_return=1' make c make clean ./configure CXX=clang++ CXXFLAGS="$COMMON_FLAGS -fsanitize=thread" make --jobs="$(nproc)" -ALL_TESTS=1 TSAN_OPTIONS='halt_on_error=1,suppressions=threadsanitizer_suppressions.txt' make check -C test/ +ALL_TESTS=1 TSAN_OPTIONS='halt_on_error=1' make check -C test/ # run tests under UndefinedBehaviorSanitizer, https://clang.llvm.org/docs/UndefinedBehaviorSanitizer.html make clean diff --git a/test/threadsanitizer_suppressions.txt b/test/threadsanitizer_suppressions.txt deleted file mode 100644 index 2e7cd8347a..0000000000 --- a/test/threadsanitizer_suppressions.txt +++ /dev/null @@ -1 +0,0 @@ -race:OPENSSL_sk_free