diff --git a/src/Makefile.am b/src/Makefile.am index 3bb2b1bef2..9780b83018 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,6 +55,7 @@ s3fs_SOURCES = \ addhead.cpp \ sighandlers.cpp \ threadpoolman.cpp \ + syncfiller.cpp \ common_auth.cpp if USE_SSL_OPENSSL s3fs_SOURCES += openssl_auth.cpp diff --git a/src/curl.cpp b/src/curl.cpp index eb4b43cdab..d6978c6d8f 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -710,6 +710,11 @@ int S3fsCurl::SetRetries(int count) return old; } +int S3fsCurl::GetRetries() +{ + return S3fsCurl::retries; +} + bool S3fsCurl::SetPublicBucket(bool flag) { bool old = S3fsCurl::is_public_bucket; @@ -2134,8 +2139,6 @@ bool S3fsCurl::ClearInternalData() // type = REQTYPE::UNSET; path = ""; - base_path = ""; - saved_path = ""; url = ""; op = ""; query_string= ""; @@ -2146,7 +2149,6 @@ bool S3fsCurl::ClearInternalData() responseHeaders.clear(); bodydata.clear(); headdata.clear(); - LastResponseCode = S3FSCURL_RESPONSECODE_NOTSET; postdata = nullptr; postdata_remaining = 0; retry_count = 0; @@ -3308,14 +3310,12 @@ bool S3fsCurl::AddSseRequestHead(sse_type_t ssetype, const std::string& input, b // // tpath : target path for head request -// bpath : saved into base_path -// savedpath : saved into saved_path // ssekey_pos : -1 means "not" SSE-C type // 0 - X means SSE-C type and position for SSE-C key(0 is latest key) // -bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char* savedpath, size_t ssekey_pos) +bool S3fsCurl::PreHeadRequest(const char* tpath, size_t ssekey_pos) { - S3FS_PRN_INFO3("[tpath=%s][bpath=%s][save=%s][sseckeypos=%zu]", SAFESTRPTR(tpath), SAFESTRPTR(bpath), SAFESTRPTR(savedpath), ssekey_pos); + S3FS_PRN_INFO3("[tpath=%s][sseckeypos=%zu]", SAFESTRPTR(tpath), ssekey_pos); if(!tpath){ return false; @@ -3327,8 +3327,6 @@ bool S3fsCurl::PreHeadRequest(const char* tpath, const char* bpath, const char* // libcurl 7.17 does deep copy of url, deep copy "stable" url url = prepare_url(turl.c_str()); path = get_realpath(tpath); - base_path = SAFESTRPTR(bpath); - saved_path = SAFESTRPTR(savedpath); requestHeaders = nullptr; responseHeaders.clear(); @@ -3364,7 +3362,7 @@ int S3fsCurl::HeadRequest(const char* tpath, headers_t& meta) if(!DestroyCurlHandle()){ break; } - if(!PreHeadRequest(tpath, nullptr, nullptr, pos)){ + if(!PreHeadRequest(tpath, pos)){ break; } if(!fpLazySetup || !fpLazySetup(this)){ diff --git a/src/curl.h b/src/curl.h index 089e42f33c..c6e5277f28 100644 --- a/src/curl.h +++ b/src/curl.h @@ -97,6 +97,8 @@ typedef std::vector sseckeylist_t; // class S3fsCurl { + // [TODO] + // If S3fsMultiCurl is discontinued, the following friends will be deleted. friend class S3fsMultiCurl; private: @@ -175,8 +177,6 @@ class S3fsCurl CurlUniquePtr hCurl PT_GUARDED_BY(curl_handles_lock) = {nullptr, curl_easy_cleanup}; REQTYPE type; // type of request std::string path; // target object path - std::string base_path; // base path (for multi curl head request) - std::string saved_path; // saved path = cache key (for multi curl head request) std::string url; // target object path(url) struct curl_slist* requestHeaders; headers_t responseHeaders; // header data by HeaderCallback @@ -187,7 +187,7 @@ class S3fsCurl off_t postdata_remaining; // use by post method and read callback function. filepart partdata; // use by multipart upload/get object callback bool is_use_ahbe; // additional header by extension - int retry_count; // retry count for multipart + int retry_count; // retry count for multipart ([TODO] If S3fsMultiCurl is discontinued, this variable will be deleted.) std::unique_ptr b_infile = {nullptr, &s3fs_fclose}; // backup for retrying const unsigned char* b_postdata; // backup for retrying off_t b_postdata_remaining; // backup for retrying @@ -273,6 +273,10 @@ class S3fsCurl void insertIBMIAMHeaders(const std::string& access_key_id, const std::string& access_token); bool insertAuthHeaders(); bool AddSseRequestHead(sse_type_t ssetype, const std::string& ssevalue, bool is_copy); + bool PreHeadRequest(const char* tpath, size_t ssekey_pos = -1); + bool PreHeadRequest(const std::string& tpath, size_t ssekey_pos = -1) { + return PreHeadRequest(tpath.c_str(), ssekey_pos); + } std::string CalcSignatureV2(const std::string& method, const std::string& strMD5, const std::string& content_type, const std::string& date, const std::string& resource, const std::string& secret_access_key, const std::string& access_token); std::string CalcSignature(const std::string& method, const std::string& canonical_uri, const std::string& query_string, const std::string& strdate, const std::string& payload_hash, const std::string& date8601, const std::string& secret_access_key, const std::string& access_token); int MultipartUploadPartSetup(const char* tpath, int part_num, const std::string& upload_id); @@ -301,6 +305,7 @@ class S3fsCurl static time_t SetReadwriteTimeout(time_t timeout); static time_t GetReadwriteTimeout() { return S3fsCurl::readwrite_timeout; } static int SetRetries(int count); + static int GetRetries(); static bool SetPublicBucket(bool flag); static bool IsPublicBucket() { return S3fsCurl::is_public_bucket; } static acl_t SetDefaultAcl(acl_t acl); @@ -367,10 +372,6 @@ class S3fsCurl int RequestPerform(bool dontAddAuthHeaders=false); int DeleteRequest(const char* tpath); int GetIAMv2ApiToken(const char* token_url, int token_ttl, const char* token_ttl_hdr, std::string& response); - bool PreHeadRequest(const char* tpath, const char* bpath = nullptr, const char* savedpath = nullptr, size_t ssekey_pos = -1); - bool PreHeadRequest(const std::string& tpath, const std::string& bpath, const std::string& savedpath, size_t ssekey_pos = -1) { - return PreHeadRequest(tpath.c_str(), bpath.c_str(), savedpath.c_str(), ssekey_pos); - } int HeadRequest(const char* tpath, headers_t& meta); int PutHeadRequest(const char* tpath, const headers_t& meta, bool is_copy); int PutRequest(const char* tpath, headers_t& meta, int fd); @@ -390,8 +391,6 @@ class S3fsCurl // methods(variables) const std::string& GetPath() const { return path; } - const std::string& GetBasePath() const { return base_path; } - const std::string& GetSpecialSavedPath() const { return saved_path; } const std::string& GetUrl() const { return url; } const std::string& GetOp() const { return op; } const headers_t* GetResponseHeaders() const { return &responseHeaders; } @@ -403,10 +402,6 @@ class S3fsCurl bool EnableUseAhbe() { return SetUseAhbe(true); } bool DisableUseAhbe() { return SetUseAhbe(false); } bool IsUseAhbe() const { return is_use_ahbe; } - int GetMultipartRetryCount() const { return retry_count; } - void SetMultipartRetryCount(int retrycnt) { retry_count = retrycnt; } - bool IsOverMultipartRetryCount() const { return (retry_count >= S3fsCurl::retries); } - size_t GetLastPreHeadSeecKeyPos() const { return b_ssekey_pos; } }; #endif // S3FS_CURL_H_ diff --git a/src/fdcache_entity.cpp b/src/fdcache_entity.cpp index d245556404..76f339e8c9 100644 --- a/src/fdcache_entity.cpp +++ b/src/fdcache_entity.cpp @@ -31,6 +31,7 @@ #include #include "common.h" +#include "s3fs.h" #include "fdcache_entity.h" #include "fdcache_fdinfo.h" #include "fdcache_stat.h" diff --git a/src/fdcache_fdinfo.cpp b/src/fdcache_fdinfo.cpp index 32d9c9dd6c..5ca83518a8 100644 --- a/src/fdcache_fdinfo.cpp +++ b/src/fdcache_fdinfo.cpp @@ -29,6 +29,7 @@ #include #include "common.h" +#include "s3fs.h" #include "s3fs_logger.h" #include "s3fs_util.h" #include "fdcache_fdinfo.h" diff --git a/src/mpu_util.cpp b/src/mpu_util.cpp index 838e5862fc..3b3998afce 100644 --- a/src/mpu_util.cpp +++ b/src/mpu_util.cpp @@ -22,6 +22,7 @@ #include #include +#include "s3fs.h" #include "s3fs_logger.h" #include "mpu_util.h" #include "curl.h" diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 04bdb5474c..1e05688a3a 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -126,8 +126,6 @@ static int check_object_access(const char* path, int mask, struct stat* pstbuf); static int check_object_owner(const char* path, struct stat* pstbuf); static int check_parent_object_access(const char* path, int mask); static int get_local_fent(AutoFdEntity& autoent, FdEntity **entity, const char* path, int flags = O_RDONLY, bool is_load = false); -static bool multi_head_callback(S3fsCurl* s3fscurl, void* param); -static std::unique_ptr multi_head_retry_callback(S3fsCurl* s3fscurl); static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler); static int list_bucket(const char* path, S3ObjList& head, const char* delimiter, bool check_content_only = false); static int directory_empty(const char* path); @@ -208,66 +206,6 @@ static int s3fs_removexattr(const char* path, const char* name); // The flag is accessed from child threads, so std::atomic is used for exclusive control of flags. static std::atomic has_mp_stat; -// -// A synchronous class that calls the fuse_fill_dir_t function that processes the readdir data -// -class SyncFiller -{ - private: - mutable std::mutex filler_lock; - void* filler_buff; - fuse_fill_dir_t filler_func; - std::set filled; - - public: - explicit SyncFiller(void* buff = nullptr, fuse_fill_dir_t filler = nullptr); - ~SyncFiller() = default; - SyncFiller(const SyncFiller&) = delete; - SyncFiller(SyncFiller&&) = delete; - SyncFiller& operator=(const SyncFiller&) = delete; - SyncFiller& operator=(SyncFiller&&) = delete; - - int Fill(const char *name, const struct stat *stbuf, off_t off); - int SufficiencyFill(const std::vector& pathlist); -}; - -SyncFiller::SyncFiller(void* buff, fuse_fill_dir_t filler) : filler_buff(buff), filler_func(filler) -{ - if(!filler_buff || !filler_func){ - S3FS_PRN_CRIT("Internal error: SyncFiller constructor parameter is critical value."); - abort(); - } -} - -// -// See. prototype fuse_fill_dir_t in fuse.h -// -int SyncFiller::Fill(const char *name, const struct stat *stbuf, off_t off) -{ - const std::lock_guard lock(filler_lock); - - int result = 0; - if(filled.insert(name).second){ - result = filler_func(filler_buff, name, stbuf, off); - } - return result; -} - -int SyncFiller::SufficiencyFill(const std::vector& pathlist) -{ - const std::lock_guard lock(filler_lock); - - int result = 0; - for(auto it = pathlist.cbegin(); it != pathlist.cend(); ++it) { - if(filled.insert(*it).second){ - if(0 != filler_func(filler_buff, it->c_str(), nullptr, 0)){ - result = 1; - } - } - } - return result; -} - //------------------------------------------------------------------- // Functions //------------------------------------------------------------------- @@ -3148,141 +3086,25 @@ static int s3fs_opendir(const char* _path, struct fuse_file_info* fi) return result; } -// [TODO] -// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl -// -// cppcheck-suppress unmatchedSuppression -// cppcheck-suppress constParameterCallback -static bool multi_head_callback(S3fsCurl* s3fscurl, void* param) -{ - if(!s3fscurl){ - return false; - } - - // Add stat cache - const std::string& saved_path = s3fscurl->GetSpecialSavedPath(); - if(!StatCache::getStatCacheData()->AddStat(saved_path, *(s3fscurl->GetResponseHeaders()))){ - S3FS_PRN_ERR("failed adding stat cache [path=%s]", saved_path.c_str()); - return false; - } - - // Get stats from stats cache(for converting from meta), and fill - std::string bpath = mybasename(saved_path); - if(use_wtf8){ - bpath = s3fs_wtf8_decode(bpath); - } - if(param){ - auto* pcbparam = reinterpret_cast(param); - struct stat st; - if(StatCache::getStatCacheData()->GetStat(saved_path, &st)){ - pcbparam->Fill(bpath.c_str(), &st, 0); - }else{ - S3FS_PRN_INFO2("Could not find %s file in stat cache.", saved_path.c_str()); - pcbparam->Fill(bpath.c_str(), nullptr, 0); - } - }else{ - S3FS_PRN_WARN("param(multi_head_callback_param*) is nullptr, then can not call filler."); - } - - return true; -} - -struct multi_head_notfound_callback_param -{ - std::mutex list_lock; - s3obj_list_t notfound_list; -}; - -// [TODO] -// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl -// -static bool multi_head_notfound_callback(S3fsCurl* s3fscurl, void* param) -{ - if(!s3fscurl){ - return false; - } - S3FS_PRN_INFO("HEAD returned NotFound(404) for %s object, it maybe only the path exists and the object does not exist.", s3fscurl->GetPath().c_str()); - - if(!param){ - S3FS_PRN_WARN("param(multi_head_notfound_callback_param*) is nullptr, then can not call filler."); - return false; - } - - // set path to not found list - auto* pcbparam = reinterpret_cast(param); - - const std::lock_guard lock(pcbparam->list_lock); - pcbparam->notfound_list.push_back(s3fscurl->GetBasePath()); - - return true; -} - -// [TODO] -// This function's argument(s3fscurl) will be checked and changed after removing S3fsMultiCurl -// -static std::unique_ptr multi_head_retry_callback(S3fsCurl* s3fscurl) -{ - if(!s3fscurl){ - return nullptr; - } - size_t ssec_key_pos= s3fscurl->GetLastPreHeadSeecKeyPos(); - int retry_count = s3fscurl->GetMultipartRetryCount(); - - // retry next sse key. - // if end of sse key, set retry master count is up. - ssec_key_pos = (ssec_key_pos == static_cast(-1) ? 0 : ssec_key_pos + 1); - if(0 == S3fsCurl::GetSseKeyCount() || S3fsCurl::GetSseKeyCount() <= ssec_key_pos){ - if(s3fscurl->IsOverMultipartRetryCount()){ - S3FS_PRN_ERR("Over retry count(%d) limit(%s).", s3fscurl->GetMultipartRetryCount(), s3fscurl->GetSpecialSavedPath().c_str()); - return nullptr; - } - ssec_key_pos = -1; - retry_count++; - } - - std::unique_ptr newcurl(new S3fsCurl(s3fscurl->IsUseAhbe())); - const std::string& path = s3fscurl->GetBasePath(); - const std::string& base_path = s3fscurl->GetBasePath(); - const std::string& saved_path = s3fscurl->GetSpecialSavedPath(); - - if(!newcurl->PreHeadRequest(path, base_path, saved_path, ssec_key_pos)){ - S3FS_PRN_ERR("Could not duplicate curl object(%s).", saved_path.c_str()); - return nullptr; - } - newcurl->SetMultipartRetryCount(retry_count); - - return newcurl; -} - static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf, fuse_fill_dir_t filler) { - // [TODO] - // This will be checked and changed after removing S3fsMultiCurl - // - S3fsMultiCurl curlmulti(S3fsCurl::GetMaxMultiRequest(), true); // [NOTE] run all requests to completion even if some requests fail. - s3obj_list_t headlist; - int result = 0; - - S3FS_PRN_INFO1("[path=%s][list=%zu]", path, headlist.size()); + S3FS_PRN_INFO1("[path=%s][head=<%s>][filler=%p]", path, head.IsEmpty() ? "empty" : "not empty", filler); // Make base path list. + s3obj_list_t headlist; head.GetNameList(headlist, true, false); // get name with "/". StatCache::getStatCacheData()->GetNotruncateCache(std::string(path), headlist); // Add notruncate file name from stat cache - // Initialize S3fsMultiCurl - curlmulti.SetSuccessCallback(multi_head_callback); - curlmulti.SetRetryCallback(multi_head_retry_callback); - - // Success Callback function parameter(SyncFiller object) + // Initialize SyncFiller object SyncFiller syncfiller(buf, filler); - curlmulti.SetSuccessCallbackParam(reinterpret_cast(&syncfiller)); - // Not found Callback function parameter - struct multi_head_notfound_callback_param notfound_param; - if(support_compat_dir){ - curlmulti.SetNotFoundCallback(multi_head_notfound_callback); - curlmulti.SetNotFoundCallbackParam(reinterpret_cast(¬found_param)); - } + // common variables + Semaphore multi_head_sem(0); + int req_count = 0; + int req_result = 0; + int retrycount = 0; + std::mutex thparam_lock; + s3obj_list_t notfound_list; // Make single head request(with max). for(auto iter = headlist.cbegin(); headlist.cend() != iter; ++iter){ @@ -3298,37 +3120,44 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf if(use_wtf8){ bpath = s3fs_wtf8_decode(bpath); } - syncfiller.Fill(bpath.c_str(), &st, 0); + syncfiller.Fill(bpath, &st, 0); continue; } - // First check for directory, start checking "not SSE-C". - // If checking failed, retry to check with "SSE-C" by retry callback func when SSE-C mode. - std::unique_ptr s3fscurl(new S3fsCurl()); - if(!s3fscurl->PreHeadRequest(disppath, disppath, disppath)){ // target path = cache key path.(ex "dir/") - S3FS_PRN_WARN("Could not make curl object for head request(%s).", disppath.c_str()); - continue; + // parameter for thread worker + auto* thargs = new multi_head_req_thparam; // free in multi_head_req_threadworker + thargs->psyncfiller = &syncfiller; + thargs->pthparam_lock = &thparam_lock; // for pretrycount and presult member + thargs->pretrycount = &retrycount; + thargs->pnotfound_list = ¬found_list; + thargs->use_wtf8 = use_wtf8; + thargs->path = disppath; + thargs->presult = &req_result; + + // make parameter for thread pool + thpoolman_param ppoolparam; + ppoolparam.args = thargs; + ppoolparam.psem = &multi_head_sem; + ppoolparam.pfunc = multi_head_req_threadworker; + + // setup instruction + if(!ThreadPoolMan::Instruct(ppoolparam)){ + S3FS_PRN_ERR("failed setup instruction for one header request."); + delete thargs; + return -EIO; } + ++req_count; + } - if(!curlmulti.SetS3fsCurlObject(std::move(s3fscurl))){ - S3FS_PRN_WARN("Could not make curl object into multi curl(%s).", disppath.c_str()); - continue; - } + // wait for finish all requests + while(req_count > 0){ + multi_head_sem.acquire(); + --req_count; } - headlist.clear(); - // Multi request - if(0 != (result = curlmulti.Request())){ - // If result is -EIO, it is something error occurred. - // This case includes that the object is encrypting(SSE) and s3fs does not have keys. - // So s3fs set result to 0 in order to continue the process. - if(-EIO == result){ - S3FS_PRN_WARN("error occurred in multi request(errno=%d), but continue...", result); - result = 0; - }else{ - S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result); - return result; - } + // print messages + if(0 != req_result){ + S3FS_PRN_DBG("Some head requests returned error, first error is %d.", req_result); } // [NOTE] @@ -3338,7 +3167,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf if(!support_compat_dir){ syncfiller.SufficiencyFill(head.GetCommonPrefixes()); } - if(support_compat_dir && !notfound_param.notfound_list.empty()){ // [NOTE] not need to lock to access this here. + if(support_compat_dir && !notfound_list.empty()){ // [NOTE] not need to lock to access this here. // dummy header mode_t dirmask = umask(0); // macos does not have getumask() umask(dirmask); @@ -3352,7 +3181,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf dummy_header["x-amz-meta-ctime"] = "0"; dummy_header["x-amz-meta-mtime"] = "0"; - for(auto reiter = notfound_param.notfound_list.cbegin(); reiter != notfound_param.notfound_list.cend(); ++reiter){ + for(auto reiter = notfound_list.cbegin(); reiter != notfound_list.cend(); ++reiter){ int dir_result; const std::string& dirpath = *reiter; if(-ENOTEMPTY == (dir_result = directory_empty(dirpath.c_str()))){ @@ -3361,17 +3190,17 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf // Add stat cache if(StatCache::getStatCacheData()->AddStat(dirpath, dummy_header, true)){ // set forcedir=true // Get stats from stats cache(for converting from meta), and fill - std::string base_path = mybasename(dirpath); + std::string bpath = mybasename(dirpath); if(use_wtf8){ - base_path = s3fs_wtf8_decode(base_path); + bpath = s3fs_wtf8_decode(bpath); } struct stat st; if(StatCache::getStatCacheData()->GetStat(dirpath, &st)){ - syncfiller.Fill(base_path.c_str(), &st, 0); + syncfiller.Fill(bpath, &st, 0); }else{ S3FS_PRN_INFO2("Could not find %s directory(no dir object) in stat cache.", dirpath.c_str()); - syncfiller.Fill(base_path.c_str(), nullptr, 0); + syncfiller.Fill(bpath, nullptr, 0); } }else{ S3FS_PRN_ERR("failed adding stat cache [path=%s], but dontinue...", dirpath.c_str()); @@ -3381,8 +3210,7 @@ static int readdir_multi_head(const char* path, const S3ObjList& head, void* buf } } } - - return result; + return 0; } static int s3fs_readdir(const char* _path, void* buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info* fi) diff --git a/src/s3fs_threadreqs.cpp b/src/s3fs_threadreqs.cpp index 33c2073545..c4fc86dd2f 100644 --- a/src/s3fs_threadreqs.cpp +++ b/src/s3fs_threadreqs.cpp @@ -19,10 +19,14 @@ */ #include "common.h" +#include "s3fs.h" #include "s3fs_threadreqs.h" #include "threadpoolman.h" #include "curl_util.h" #include "s3fs_logger.h" +#include "s3fs_util.h" +#include "cache.h" +#include "string_util.h" //------------------------------------------------------------------- // Thread Worker functions for MultiThread Request @@ -44,6 +48,150 @@ void* head_req_threadworker(void* arg) return reinterpret_cast(pthparam->result); } +// +// Thread Worker function for multi head request +// +void* multi_head_req_threadworker(void* arg) +{ + std::unique_ptr pthparam(static_cast(arg)); + if(!pthparam || !pthparam->psyncfiller || !pthparam->pthparam_lock || !pthparam->pretrycount || !pthparam->pnotfound_list || !pthparam->presult){ + return reinterpret_cast(-EIO); + } + + // Check retry max count and print debug message + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + S3FS_PRN_INFO3("Multi Head Request [filler=%p][thparam_lock=%p][retrycount=%d][notfound_list=%p][wtf8=%s][path=%s]", pthparam->psyncfiller, pthparam->pthparam_lock, *(pthparam->pretrycount), pthparam->pnotfound_list, pthparam->use_wtf8 ? "true" : "false", pthparam->path.c_str()); + + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount)); + return reinterpret_cast(-EIO); + } + } + + // loop for head request + S3fsCurl s3fscurl; + int result = 0; + headers_t meta; // this value is not used + while(true){ + // Request + result = s3fscurl.HeadRequest(pthparam->path.c_str(), meta); + + // Check result + bool isResetOffset= true; + CURLcode curlCode = s3fscurl.GetCurlCode(); + long responseCode = S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET; + s3fscurl.GetResponseCode(responseCode, false); + + if(CURLE_OK == curlCode){ + if(responseCode < 400){ + // add into stat cache + if(StatCache::getStatCacheData()->AddStat(pthparam->path, *(s3fscurl.GetResponseHeaders()))){ + // Get stats from stats cache(for converting from meta), and fill + std::string bpath = mybasename(pthparam->path); + if(pthparam->use_wtf8){ + bpath = s3fs_wtf8_decode(bpath); + } + + struct stat st; + if(StatCache::getStatCacheData()->GetStat(pthparam->path, &st)){ + pthparam->psyncfiller->Fill(bpath, &st, 0); + }else{ + S3FS_PRN_INFO2("Could not find %s file in stat cache.", pthparam->path.c_str()); + pthparam->psyncfiller->Fill(bpath, nullptr, 0); + } + result = 0; + }else{ + S3FS_PRN_ERR("failed adding stat cache [path=%s]", pthparam->path.c_str()); + if(0 == result){ + result = -EIO; + } + } + break; + + }else if(responseCode == 400){ + // as possibly in multipart + S3FS_PRN_WARN("Head Request(%s) got 400 response code.", pthparam->path.c_str()); + + }else if(responseCode == 404){ + // set path to not found list + S3FS_PRN_INFO("Head Request(%s) got NotFound(404), it maybe only the path exists and the object does not exist.", pthparam->path.c_str()); + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + pthparam->pnotfound_list->push_back(pthparam->path); + } + break; + + }else if(responseCode == 500){ + // case of all other result, do retry.(11/13/2013) + // because it was found that s3fs got 500 error from S3, but could success + // to retry it. + S3FS_PRN_WARN("Head Request(%s) got 500 response code.", pthparam->path.c_str()); + + // cppcheck-suppress unmatchedSuppression + // cppcheck-suppress knownConditionTrueFalse + }else if(responseCode == S3fsCurl::S3FSCURL_RESPONSECODE_NOTSET){ + // This is a case where the processing result has not yet been updated (should be very rare). + S3FS_PRN_WARN("Head Request(%s) could not get any response code.", pthparam->path.c_str()); + + }else{ // including S3fsCurl::S3FSCURL_RESPONSECODE_FATAL_ERROR + // Retry in other case. + S3FS_PRN_WARN("Head Request(%s) got fatal response code.", pthparam->path.c_str()); + } + + }else if(CURLE_OPERATION_TIMEDOUT == curlCode){ + S3FS_PRN_ERR("Head Request(%s) is timeouted.", pthparam->path.c_str()); + isResetOffset= false; + + }else if(CURLE_PARTIAL_FILE == curlCode){ + S3FS_PRN_WARN("Head Request(%s) is recieved data does not match the given size.", pthparam->path.c_str()); + isResetOffset= false; + + }else{ + S3FS_PRN_WARN("Head Request(%s) got the result code(%d: %s)", pthparam->path.c_str(), curlCode, curl_easy_strerror(curlCode)); + } + + // Check retry max count + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + + ++(*(pthparam->pretrycount)); + if(S3fsCurl::GetRetries() < *(pthparam->pretrycount)){ + S3FS_PRN_ERR("Head request(%s) reached the maximum number of retry count(%d).", pthparam->path.c_str(), *(pthparam->pretrycount)); + if(0 == result){ + result = -EIO; + } + break; + } + } + + // Setup for retry + if(isResetOffset){ + S3fsCurl::ResetOffset(&s3fscurl); + } + } + + // Set result code + { + const std::lock_guard lock(*(pthparam->pthparam_lock)); + if(0 == *(pthparam->presult) && 0 != result){ + // keep first error + *(pthparam->presult) = result; + } + } + + // [NOTE] + // The return value of a Multi Head request thread will always be 0(nullptr). + // This is because the expected value of a Head request will always be a + // response other than 200, such as 400/404/etc. + // In those error cases, this function simply outputs a message. And those + // errors(the first one) will be set to pthparam->presult and can be referenced + // by the caller. + // + return nullptr; +} + // // Thread Worker function for delete request // diff --git a/src/s3fs_threadreqs.h b/src/s3fs_threadreqs.h index 50e5005ba2..601b5eb1eb 100644 --- a/src/s3fs_threadreqs.h +++ b/src/s3fs_threadreqs.h @@ -26,6 +26,8 @@ #include "common.h" #include "metaheader.h" #include "curl.h" +#include "s3objlist.h" +#include "syncfiller.h" //------------------------------------------------------------------- // Structures for MultiThread Request @@ -40,6 +42,20 @@ struct head_req_thparam int result = 0; }; +// +// Multi Head Request parameter structure for Thread Pool. +// +struct multi_head_req_thparam +{ + std::string path; + SyncFiller* psyncfiller = nullptr; + std::mutex* pthparam_lock = nullptr; + int* pretrycount = nullptr; + s3obj_list_t* pnotfound_list = nullptr; + bool use_wtf8 = false; + int* presult = nullptr; +}; + // // Delete Request parameter structure for Thread Pool. // @@ -144,6 +160,7 @@ struct get_object_req_thparam // Thread Worker functions for MultiThread Request //------------------------------------------------------------------- void* head_req_threadworker(void* arg); +void* multi_head_req_threadworker(void* arg); void* delete_req_threadworker(void* arg); void* put_head_req_threadworker(void* arg); void* put_req_threadworker(void* arg); diff --git a/src/syncfiller.cpp b/src/syncfiller.cpp new file mode 100644 index 0000000000..01e62cbbf3 --- /dev/null +++ b/src/syncfiller.cpp @@ -0,0 +1,74 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include +#include + +#include "s3fs_logger.h" +#include "syncfiller.h" + +//------------------------------------------------------------------- +// Class SyncFiller +//------------------------------------------------------------------- +SyncFiller::SyncFiller(void* buff, fuse_fill_dir_t filler) : filler_buff(buff), filler_func(filler) +{ + if(!filler_buff || !filler_func){ + S3FS_PRN_CRIT("Internal error: SyncFiller constructor parameter is critical value."); + abort(); + } +} + +// +// See. prototype fuse_fill_dir_t in fuse.h +// +int SyncFiller::Fill(const std::string& name, const struct stat *stbuf, off_t off) +{ + const std::lock_guard lock(filler_lock); + + int result = 0; + if(filled.insert(name).second){ + result = filler_func(filler_buff, name.c_str(), stbuf, off); + } + return result; +} + +int SyncFiller::SufficiencyFill(const std::vector& pathlist) +{ + const std::lock_guard lock(filler_lock); + + int result = 0; + for(auto it = pathlist.cbegin(); it != pathlist.cend(); ++it) { + if(filled.insert(*it).second){ + if(0 != filler_func(filler_buff, it->c_str(), nullptr, 0)){ + result = 1; + } + } + } + return result; +} + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/ diff --git a/src/syncfiller.h b/src/syncfiller.h new file mode 100644 index 0000000000..8e4be6f74b --- /dev/null +++ b/src/syncfiller.h @@ -0,0 +1,67 @@ +/* + * s3fs - FUSE-based file system backed by Amazon S3 + * + * Copyright(C) 2007 Randy Rizun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#ifndef SYNCFILLER_H_ +#define SYNCFILLER_H_ + +#include +#include +#include +#include + +#include "s3fs.h" + +//---------------------------------------------- +// class SyncFiller +//---------------------------------------------- +// +// A synchronous class that calls the fuse_fill_dir_t +// function that processes the readdir data +// +class SyncFiller +{ + private: + mutable std::mutex filler_lock; + void* filler_buff; + fuse_fill_dir_t filler_func; + std::set filled; + + public: + explicit SyncFiller(void* buff = nullptr, fuse_fill_dir_t filler = nullptr); + ~SyncFiller() = default; + SyncFiller(const SyncFiller&) = delete; + SyncFiller(SyncFiller&&) = delete; + SyncFiller& operator=(const SyncFiller&) = delete; + SyncFiller& operator=(SyncFiller&&) = delete; + + int Fill(const std::string& name, const struct stat *stbuf, off_t off); + int SufficiencyFill(const std::vector& pathlist); +}; + +#endif // SYNCFILLER_H_ + +/* +* Local variables: +* tab-width: 4 +* c-basic-offset: 4 +* End: +* vim600: expandtab sw=4 ts=4 fdm=marker +* vim<600: expandtab sw=4 ts=4 +*/