Skip to content

Commit

Permalink
Merge pull request #1249 from trapexit/tpfix
Browse files Browse the repository at this point in the history
Fix thread pool destruction where threads don't explicitly exit themselves
  • Loading branch information
trapexit authored Sep 9, 2023
2 parents dd7e9e2 + 766b923 commit e09b10c
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 5,244 deletions.
3 changes: 1 addition & 2 deletions libfuse/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ SRC_C = \
lib/fuse_session.c \
lib/fuse_signals.c \
lib/helper.c \
lib/mount.c \
lib/syslog.c
lib/mount.c
SRC_CPP = \
lib/format.cpp \
lib/os.cpp \
Expand Down
File renamed without changes.
54 changes: 30 additions & 24 deletions libfuse/lib/thread_pool.hpp → libfuse/include/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "moodycamel/blockingconcurrentqueue.h"
#include "syslog.h"

#include <atomic>
#include <csignal>
Expand All @@ -14,6 +13,7 @@
#include <thread>
#include <vector>

#include <syslog.h>

struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
Expand All @@ -35,10 +35,11 @@ class ThreadPool
: _queue(queue_depth_,thread_count_,thread_count_),
_name(get_thread_name(name_))
{
syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
thread_count_,
queue_depth_,
_name.c_str());
syslog(LOG_DEBUG,
"threadpool: spawning %zu threads of queue depth %zu named '%s'",
thread_count_,
queue_depth_,
_name.c_str());

sigset_t oldset;
sigset_t newset;
Expand All @@ -55,9 +56,10 @@ class ThreadPool
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
syslog(LOG_WARNING,
"threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
continue;
}

Expand All @@ -75,17 +77,18 @@ class ThreadPool

~ThreadPool()
{
syslog_debug("threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());
syslog(LOG_DEBUG,
"threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());

auto func = []() { pthread_exit(NULL); };
for(std::size_t i = 0; i < _threads.size(); i++)
_queue.enqueue(func);

for(auto t : _threads)
pthread_cancel(t);

Func f;
while(_queue.try_dequeue(f))
continue;

for(auto t : _threads)
pthread_join(t,NULL);
}
Expand Down Expand Up @@ -142,9 +145,10 @@ class ThreadPool

if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
syslog(LOG_WARNING,
"threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
return -rv;
}

Expand All @@ -156,9 +160,10 @@ class ThreadPool
_threads.push_back(t);
}

syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'",
_name.c_str(),
name.c_str());
syslog(LOG_DEBUG,
"threadpool: 1 thread added to pool '%s' named '%s'",
_name.c_str(),
name.c_str());

return 0;
}
Expand Down Expand Up @@ -195,9 +200,10 @@ class ThreadPool

char name[16];
pthread_getname_np(t,name,sizeof(name));
syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'",
_name.c_str(),
name);
syslog(LOG_DEBUG,
"threadpool: 1 thread removed from pool '%s' named '%s'",
_name.c_str(),
name);

pthread_exit(NULL);
};
Expand Down
7 changes: 4 additions & 3 deletions libfuse/lib/fuse.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <sys/param.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <syslog.h>
#include <time.h>
#include <unistd.h>

Expand Down Expand Up @@ -3899,7 +3900,7 @@ fuse_invalidate_all_nodes()
{
struct fuse *f = fuse_get_fuse_obj();

syslog_info("invalidating file entries");
syslog(LOG_INFO,"invalidating file entries");

pthread_mutex_lock(&f->lock);
for(int i = 0; i < f->id_table.size; i++)
Expand All @@ -3925,7 +3926,7 @@ fuse_invalidate_all_nodes()
void
fuse_gc()
{
syslog_info("running thorough garbage collection");
syslog(LOG_INFO,"running thorough garbage collection");
node_gc();
msgbuf_gc();
fuse_malloc_trim();
Expand All @@ -3934,7 +3935,7 @@ fuse_gc()
void
fuse_gc1()
{
syslog_info("running basic garbage collection");
syslog(LOG_INFO,"running basic garbage collection");
node_gc1();
msgbuf_gc_10percent();
fuse_malloc_trim();
Expand Down
25 changes: 14 additions & 11 deletions libfuse/lib/fuse_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "fmt/core.h"
#include "make_unique.hpp"
#include "scope_guard.hpp"
#include "syslog.h"
#include "thread_pool.hpp"

#include "fuse_i.h"
Expand All @@ -26,6 +25,7 @@
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <syslog.h>
#include <unistd.h>

#include <cassert>
Expand Down Expand Up @@ -449,7 +449,9 @@ pin_threads(const std::vector<pthread_t> read_threads_,
if(type_ == "R1PPSP")
return ::pin_threads_R1PPSP(read_threads_,process_threads_);

syslog_warning("Invalid pin-threads value, ignoring: %s",type_.c_str());
syslog(LOG_WARNING,
"Invalid pin-threads value, ignoring: %s",
type_.c_str());
}

static
Expand Down Expand Up @@ -510,15 +512,16 @@ fuse_session_loop_mt(struct fuse_session *se_,

::pin_threads(read_threads,process_threads,pin_threads_type_);

syslog_info("read-thread-count=%d; "
"process-thread-count=%d; "
"process-thread-queue-depth=%d; "
"pin-threads=%s;"
,
read_thread_count,
process_thread_count,
process_thread_queue_depth,
pin_threads_type_.c_str());
syslog(LOG_INFO,
"read-thread-count=%d; "
"process-thread-count=%d; "
"process-thread-queue-depth=%d; "
"pin-threads=%s;"
,
read_thread_count,
process_thread_count,
process_thread_queue_depth,
pin_threads_type_.c_str());

::wait(se_,&finished);

Expand Down
114 changes: 0 additions & 114 deletions libfuse/lib/syslog.c

This file was deleted.

31 changes: 0 additions & 31 deletions libfuse/lib/syslog.h

This file was deleted.

Loading

0 comments on commit e09b10c

Please sign in to comment.