Skip to content

Commit

Permalink
Organized multi-threading related options
Browse files Browse the repository at this point in the history
  • Loading branch information
ggtakec committed Nov 26, 2024
1 parent 8f0c861 commit b8c1245
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 76 deletions.
18 changes: 4 additions & 14 deletions doc/man/s3fs.1.in
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,6 @@ s3fs is always using DNS cache, this option make DNS cache disable.
\fB\-o\fR nosscache - disable SSL session cache.
s3fs is always using SSL session cache, this option make SSL session cache disable.
.TP
\fB\-o\fR multireq_max (default="20")
maximum number of parallel request for listing objects.
.TP
\fB\-o\fR parallel_count (default="5")
number of parallel request for uploading big objects.
s3fs uploads large object (over 25MB by default) by multipart post request, and sends parallel requests.
This option limits parallel request count which s3fs requests at once.
It is necessary to set this value depending on a CPU and a network band.
.TP
\fB\-o\fR multipart_size (default="10")
part size, in MB, for each multipart request.
The minimum value is 5 MB and the maximum value is 5 GB.
Expand Down Expand Up @@ -296,10 +287,9 @@ If this option is enabled, a sequential upload will be performed in parallel wit
This is expected to give better performance than other upload functions.
Note that this option is still experimental and may change in the future.
.TP
\fB\-o\fR max_thread_count (default is "5")
Specifies the number of threads waiting for stream uploads.
Note that this option and Stream Upload are still experimental and subject to change in the future.
This option will be merged with "parallel_count" in the future.
\fB\-o\fR max_thread_count (default is "10")
This value is the maximum number of parallel requests to be sent, and the number of parallel processes for head requests, multipart uploads and stream uploads.
Worker threads will be started to process requests according to this value.
.TP
\fB\-o\fR enable_content_md5 (default is disable)
Allow S3 server to check data integrity of uploads via the Content-MD5 header.
Expand Down Expand Up @@ -522,7 +512,7 @@ s3fs is a multi-threaded application. Depending on the workload it may use multi
.TP
.SS Performance of S3 requests
.TP
s3fs provides several options (e.g. "\-o multipart_size", "\-o parallel_count") to control behaviour and thus indirectly the performance. The possible combinations of these options in conjunction with the various S3 backends are so varied that there is no individual recommendation other than the default values. Improved individual settings can be found by testing and measuring.
s3fs provides several options (e.g. "max_thread_count" option) to control behaviour and thus indirectly the performance. The possible combinations of these options in conjunction with the various S3 backends are so varied that there is no individual recommendation other than the default values. Improved individual settings can be found by testing and measuring.
.TP
The two options "Enable no object cache" ("\-o enable_noobj_cache") and "Disable support of alternative directory names" ("\-o notsup_compat_dir") can be used to control shared access to the same bucket by different applications:
.TP
Expand Down
16 changes: 0 additions & 16 deletions src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ std::map<const CURL*, curlprogress> S3fsCurl::curl_progress;
std::string S3fsCurl::curl_ca_bundle;
mimes_t S3fsCurl::mimeTypes;
std::string S3fsCurl::userAgent;
int S3fsCurl::max_parallel_cnt = 5; // default
int S3fsCurl::max_multireq = 20; // default
off_t S3fsCurl::multipart_size = MULTIPART_SIZE; // default
off_t S3fsCurl::multipart_copy_size = 512 * 1024 * 1024; // default
signature_type_t S3fsCurl::signature_type = signature_type_t::V2_OR_V4; // default
Expand Down Expand Up @@ -946,20 +944,6 @@ bool S3fsCurl::SetMultipartCopySize(off_t size)
return true;
}

int S3fsCurl::SetMaxParallelCount(int value)
{
int old = S3fsCurl::max_parallel_cnt;
S3fsCurl::max_parallel_cnt = value;
return old;
}

int S3fsCurl::SetMaxMultiRequest(int max)
{
int old = S3fsCurl::max_multireq;
S3fsCurl::max_multireq = max;
return old;
}

// [NOTE]
// This proxy setting is as same as the "--proxy" option of the curl command,
// and equivalent to the "CURLOPT_PROXY" option of the curl_easy_setopt()
Expand Down
7 changes: 0 additions & 7 deletions src/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class S3fsCurl
static std::string curl_ca_bundle;
static mimes_t mimeTypes;
static std::string userAgent;
static int max_parallel_cnt;
static int max_multireq;
static off_t multipart_size;
static off_t multipart_copy_size;
Expand Down Expand Up @@ -302,12 +301,6 @@ class S3fsCurl
static long GetSslVerifyHostname() { return S3fsCurl::ssl_verify_hostname; }
static bool SetSSLClientCertOptions(const std::string& values);
static void ResetOffset(S3fsCurl* pCurl);
// maximum parallel GET and PUT requests
static int SetMaxParallelCount(int value);
static int GetMaxParallelCount() { return S3fsCurl::max_parallel_cnt; }
// maximum parallel HEAD requests
static int SetMaxMultiRequest(int max);
static int GetMaxMultiRequest() { return S3fsCurl::max_multireq; }
static bool SetMultipartSize(off_t size);
static off_t GetMultipartSize() { return S3fsCurl::multipart_size; }
static bool SetMultipartCopySize(off_t size);
Expand Down
2 changes: 1 addition & 1 deletion src/fdcache_entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2036,7 +2036,7 @@ ssize_t FdEntity::Read(int fd, char* bytes, off_t start, size_t size, bool force
// load size(for prefetch)
size_t load_size = size;
if(start + static_cast<ssize_t>(size) < pagelist.Size()){
ssize_t prefetch_max_size = std::max(static_cast<off_t>(size), S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount());
ssize_t prefetch_max_size = std::max(static_cast<off_t>(size), S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount());

if(start + prefetch_max_size < pagelist.Size()){
load_size = prefetch_max_size;
Expand Down
28 changes: 8 additions & 20 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ static int max_keys_list_object = 1000;// default is 1000
static off_t max_dirty_data = 5LL * 1024LL * 1024LL * 1024LL;
static bool use_wtf8 = false;
static off_t fake_diskfree_size = -1; // default is not set(-1)
static int max_thread_count = 5; // default is 5
static bool update_parent_dir_stat= false; // default not updating parent directory stats
static fsblkcnt_t bucket_block_count; // advertised block count of the bucket
static unsigned long s3fs_block_size = 16 * 1024 * 1024; // s3fs block size is 16MB
Expand Down Expand Up @@ -4044,8 +4043,8 @@ static void* s3fs_init(struct fuse_conn_info* conn)
S3FS_PRN_DBG("Could not initialize cache directory.");
}

if(!ThreadPoolMan::Initialize(max_thread_count)){
S3FS_PRN_CRIT("Could not create thread pool(%d)", max_thread_count);
if(!ThreadPoolMan::Initialize()){
S3FS_PRN_CRIT("Could not create thread pool(%d)", ThreadPoolMan::GetWorkerCount());
s3fs_exit_fuseloop(EXIT_FAILURE);
}

Expand Down Expand Up @@ -4691,11 +4690,6 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
is_remove_cache = true;
return 0;
}
else if(is_prefix(arg, "multireq_max=")){
int maxreq = static_cast<int>(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10));
S3fsCurl::SetMaxMultiRequest(maxreq);
return 0;
}
else if(0 == strcmp(arg, "nonempty")){
nonempty = true;
return 1; // need to continue for fuse.
Expand Down Expand Up @@ -4941,23 +4935,17 @@ static int my_fuse_opt_proc(void* data, const char* arg, int key, struct fuse_ar
S3fsCurlShare::SetSslSessionCache(false);
return 0;
}
else if(is_prefix(arg, "parallel_count=") || is_prefix(arg, "parallel_upload=")){
int maxpara = static_cast<int>(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10));
if(0 >= maxpara){
S3FS_PRN_EXIT("argument should be over 1: parallel_count");
return -1;
}
S3fsCurl::SetMaxParallelCount(maxpara);
return 0;
else if(is_prefix(arg, "multireq_max=") || is_prefix(arg, "parallel_count=") || is_prefix(arg, "parallel_upload=")){
S3FS_PRN_EXIT("this option is Obsoleted, the max_thread_count option can be used instead.");
return -1;
}
else if(is_prefix(arg, "max_thread_count=")){
int max_thcount = static_cast<int>(cvt_strtoofft(strchr(arg, '=') + sizeof(char), /*base=*/ 10));
if(0 >= max_thcount){
S3FS_PRN_EXIT("argument should be over 1: max_thread_count");
return -1;
}
max_thread_count = max_thcount;
S3FS_PRN_WARN("The max_thread_count option is not a formal option. Please note that it will change in the future.");
ThreadPoolMan::SetWorkerCount(max_thcount);
return 0;
}
else if(is_prefix(arg, "fd_page_size=")){
Expand Down Expand Up @@ -5610,12 +5598,12 @@ int main(int argc, char* argv[])
}

// Check free disk space for maultipart request
if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount())){
if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount())){
// Try to clean cache dir and retry
S3FS_PRN_WARN("No enough disk space for s3fs, try to clean cache dir");
FdManager::get()->CleanupCacheDir();

if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * S3fsCurl::GetMaxParallelCount(), true)){
if(!FdManager::IsSafeDiskSpace(nullptr, S3fsCurl::GetMultipartSize() * ThreadPoolMan::GetWorkerCount(), true)){
S3fsCurl::DestroyS3fsCurl();
s3fs_destroy_global_ssl();
exit(EXIT_FAILURE);
Expand Down
22 changes: 6 additions & 16 deletions src/s3fs_help.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,6 @@ static constexpr char help_string[] =
" - s3fs is always using SSL session cache, this option make SSL \n"
" session cache disable.\n"
"\n"
" multireq_max (default=\"20\")\n"
" - maximum number of parallel request for listing objects.\n"
"\n"
" parallel_count (default=\"5\")\n"
" - number of parallel request for uploading big objects.\n"
" s3fs uploads large object (over 20MB) by multipart upload request, \n"
" and sends parallel requests.\n"
" This option limits parallel request count which s3fs requests \n"
" at once. It is necessary to set this value depending on a CPU \n"
" and a network band.\n"
"\n"
" multipart_size (default=\"10\")\n"
" - part size, in MB, for each multipart request.\n"
" The minimum value is 5 MB and the maximum value is 5 GB.\n"
Expand Down Expand Up @@ -357,11 +346,12 @@ static constexpr char help_string[] =
" Note that this option is still experimental and may change in the\n"
" future.\n"
"\n"
" max_thread_count (default is \"5\")\n"
" - Specifies the number of threads waiting for stream uploads.\n"
" Note that this option and Stream Upload are still experimental\n"
" and subject to change in the future.\n"
" This option will be merged with \"parallel_count\" in the future.\n"
" max_thread_count (default is \"10\")\n"
" - This value is the maximum number of parallel requests to be\n"
" sent, and the number of parallel processes for head requests,\n"
" multipart uploads and stream uploads.\n"
" Worker threads will be started to process requests according to\n"
" this value.\n"
"\n"
" enable_content_md5 (default is disable)\n"
" - Allow S3 server to check data integrity of uploads via the\n"
Expand Down
26 changes: 25 additions & 1 deletion src/threadpoolman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//------------------------------------------------
// ThreadPoolMan class variables
//------------------------------------------------
int ThreadPoolMan::worker_count = 10; // default
std::unique_ptr<ThreadPoolMan> ThreadPoolMan::singleton;

//------------------------------------------------
Expand All @@ -45,7 +46,11 @@ bool ThreadPoolMan::Initialize(int count)
S3FS_PRN_CRIT("Already singleton for Thread Manager exists.");
abort();
}
ThreadPoolMan::singleton.reset(new ThreadPoolMan(count));
if(-1 != count){
ThreadPoolMan::SetWorkerCount(count);
}
ThreadPoolMan::singleton.reset(new ThreadPoolMan(ThreadPoolMan::worker_count));

return true;
}

Expand All @@ -54,6 +59,25 @@ void ThreadPoolMan::Destroy()
ThreadPoolMan::singleton.reset();
}

int ThreadPoolMan::SetWorkerCount(int count)
{
if(0 >= count){
S3FS_PRN_ERR("Thread worker count(%d) must be positive number.", count);
return -1;
}
if(count == ThreadPoolMan::worker_count){
return ThreadPoolMan::worker_count;
}

// [TODO]
// If we need to dynamically change worker threads, this is
// where we would terminate/add workers.
//
int old = ThreadPoolMan::worker_count;
ThreadPoolMan::worker_count = count;
return old;
}

bool ThreadPoolMan::Instruct(const thpoolman_param& param)
{
if(!ThreadPoolMan::singleton){
Expand Down
5 changes: 4 additions & 1 deletion src/threadpoolman.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef std::list<thpoolman_param> thpoolman_params_t;
class ThreadPoolMan
{
private:
static int worker_count;
static std::unique_ptr<ThreadPoolMan> singleton;

std::atomic<bool> is_exit;
Expand Down Expand Up @@ -91,8 +92,10 @@ class ThreadPoolMan
ThreadPoolMan& operator=(const ThreadPoolMan&) = delete;
ThreadPoolMan& operator=(ThreadPoolMan&&) = delete;

static bool Initialize(int count);
static bool Initialize(int count = -1);
static void Destroy();
static int SetWorkerCount(int count);
static int GetWorkerCount() { return ThreadPoolMan::worker_count; }
static bool Instruct(const thpoolman_param& pparam);
static bool AwaitInstruct(const thpoolman_param& param);
};
Expand Down

0 comments on commit b8c1245

Please sign in to comment.