Skip to content

Commit

Permalink
adding per-thread stack size control
Browse files Browse the repository at this point in the history
  • Loading branch information
gpertea committed May 11, 2016
1 parent 393d80b commit e94679a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
58 changes: 43 additions & 15 deletions gclib/GThreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ freely, subject to the following restrictions:
#elif defined(_GTHREADS_WIN32_)
#include <process.h>
#endif

#include <string.h>

//namespace tthread {

Expand Down Expand Up @@ -146,6 +146,16 @@ static thread::id _pthread_t_to_ID(const pthread_t &aHandle)
#endif // _GTHREADS_POSIX_
*/

void gthreads_errExit(int err, const char* msg) {
if (msg!=NULL)
fprintf(stderr, "GThreads Error: %s (%s)\n", msg, strerror(err));
else
fprintf(stderr, "GThreads Error: %s\n", strerror(err));
exit(EXIT_FAILURE);
}



void GThread::update_counter(int inc, GThread* t_update) {
static GMutex counterLock;
GLockGuard<GMutex> guard(counterLock);
Expand All @@ -158,7 +168,7 @@ void GThread::update_counter(int inc, GThread* t_update) {
t_update->mId=0; // thread terminated

}


//------------------------------------------------------------------------------
// thread
Expand Down Expand Up @@ -240,7 +250,7 @@ void * GThread::wrapper_function(void * aArg)
}


void GThread::initStart(void* tidata) {
void GThread::initStart(void* tidata, size_t stacksize) {
_thread_start_info * ti = (_thread_start_info *) tidata;
/*ti->mFunction = aFunction;
ti->mArg = aArg;
Expand All @@ -253,10 +263,29 @@ void GThread::initStart(void* tidata) {
#if defined(_GTHREADS_WIN32_)
mHandle = (HANDLE) _beginthreadex(0, 0, wrapper_function, (void *) ti, 0, &mWin32ThreadID);
#elif defined(_GTHREADS_POSIX_)
if(pthread_create(&mHandle, NULL, wrapper_function, (void *) ti) != 0)
mHandle = 0;
#endif
if (stacksize>0) {
pthread_attr_t attr;
int r=pthread_attr_init(&attr);
if (r!=0) gthreads_errExit(r, "pthread_attr_init()");
r = pthread_attr_setstacksize(&attr, stacksize);
if (r!=0) gthreads_errExit(r, "pthread_attr_setstacksize()");
stack_size=stacksize;
r=pthread_create(&mHandle, &attr, wrapper_function, (void *) ti);
if (r!=0) {
gthreads_errExit(r, "pthread_create()");
//mHandle = 0;
}
r=pthread_attr_destroy(&attr);
if (r!=0) gthreads_errExit(r, "pthread_attr_destroy()");
}
else {
int r=pthread_create(&mHandle, NULL, wrapper_function, (void *) ti);
if (r!= 0)
gthreads_errExit(r, "pthread_create()");
//mHandle = 0;
}

#endif
// Did we fail to create the thread?
if(!mHandle)
{
Expand All @@ -266,39 +295,38 @@ void GThread::initStart(void* tidata) {
else GThread::update_counter(1, this);
}

//GThread::GThread(void (*aFunction)(void *, GThread*), void * aArg)
GThread::GThread(void (*aFunction)(void *), void * aArg): mId(0), mHandle(0), mNotAThread(true)
GThread::GThread(void (*aFunction)(void *), void * aArg, size_t stacksize): mId(0), mHandle(0), mNotAThread(true)
#if defined(_GTHREADS_WIN32_)
, mWin32ThreadID(0)
#endif
{
kickStart(aFunction, aArg);
kickStart(aFunction, aArg, stacksize);
}

void GThread::kickStart(void (*aFunction)(void *), void * aArg) {
void GThread::kickStart(void (*aFunction)(void *), void * aArg, size_t stacksize) {
// Serialize access to this thread structure
GLockGuard<GMutex> guard(mDataMutex);
// Fill out the thread startup information (passed to the thread wrapper,
// which will eventually free it)
_thread_start_info * ti = new _thread_start_info(this, aFunction, aArg);
initStart(ti);
initStart(ti, stacksize);
}

//custom alternate constructor (non-C++11 compatible), passing GThreadData back to the
//user function in order to easily retrieve current GThread object
//(better alternative to this_thread)
GThread::GThread(void (*gFunction)(GThreadData& thread_data), void * aArg) {
kickStart(gFunction, aArg);
GThread::GThread(void (*gFunction)(GThreadData& thread_data), void * aArg, size_t stacksize) {
kickStart(gFunction, aArg, stacksize);
}

void GThread::kickStart(void (*gFunction)(GThreadData& thread_data), void * aArg) {
void GThread::kickStart(void (*gFunction)(GThreadData& thread_data), void * aArg, size_t stacksize) {
// Serialize access to this thread structure
GLockGuard<GMutex> guard(mDataMutex);

// Fill out the thread startup information (passed to the thread wrapper,
// which will eventually free it)
_thread_start_info * ti = new _thread_start_info(this, gFunction, aArg);
initStart(ti);
initStart(ti, stacksize);
}

GThread::~GThread()
Expand Down
33 changes: 23 additions & 10 deletions gclib/GThreads.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ freely, subject to the following restrictions:
#include <sched.h>
#include <unistd.h>
#endif

// Generic includes
//#include <ostream>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

/// TinyThread++ version (major number).
#define TINYTHREAD_VERSION_MAJOR 1
Expand Down Expand Up @@ -174,6 +174,10 @@ freely, subject to the following restrictions:
/// the std::mutex class.
//namespace tthread {

void gthreads_errExit(int err, const char* msg=NULL);

#define pthreads_err(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)
/// GMutex class
/// This is a mutual exclusion object for synchronizing access to shared
/// memory areas for several threads. The mutex is non-recursive (i.e. a
Expand Down Expand Up @@ -696,6 +700,7 @@ class GThread {
#endif
private:
int mId;
size_t stack_size; //available only for pthreads
static int tcounter; //counts live, joinable GThread instances
static int num_created; //counts all joinable GThread instances ever created by current process

Expand All @@ -709,7 +714,7 @@ class GThread {
/// Default constructor.
/// Construct a thread object without an associated thread of execution
/// (i.e. non-joinable).
GThread() : mId(0), mHandle(0), mNotAThread(true)
GThread(size_t stacksize=0) : mId(0), stack_size(stacksize), mHandle(0), mNotAThread(true)
#if defined(_GTHREADS_WIN32_)
, mWin32ThreadID(0)
#endif
Expand All @@ -724,12 +729,12 @@ class GThread {
/// thread class. It is more similar to the pthread_create() (POSIX) and
/// CreateThread() (Windows) functions.
//GThread(void (*aFunction)(void *, GThread*), void * aArg);
GThread(void (*aFunction)(void *), void * aArg=NULL);
GThread(void (*aFunction)(void *), void * aArg=NULL, size_t stacksize=0);

GThread(void (*aFunction)(GThreadData& thread_data), void * aArg);
GThread(void (*aFunction)(GThreadData& thread_data), void * aArg, size_t stacksize=0);

void kickStart(void (*aFunction)(GThreadData& thread_data), void * aArg);
void kickStart(void (*aFunction)(void *), void * aArg=NULL);
void kickStart(void (*aFunction)(GThreadData& thread_data), void * aArg, size_t stacksize=0);
void kickStart(void (*aFunction)(void *), void * aArg=NULL, size_t stacksize=0);

/// Destructor.
/// @note If the thread is joinable upon destruction, \c std::terminate()
Expand All @@ -752,7 +757,7 @@ class GThread {
void detach();
/// Return the thread ID of a thread object.
int get_id() const; // { return mID; }

size_t getStackSize() { return stack_size; } //only for pthreads
/// Get the native handle for this thread.
/// @note Under Windows, this is a \c HANDLE, and under POSIX systems, this
/// is a \c pthread_t.
Expand All @@ -775,6 +780,14 @@ class GThread {
int r=tcounter;
return r;
}
static size_t defaultStackSize() {
pthread_attr_t attr;
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr, &stacksize);
pthread_attr_destroy(&attr);
return stacksize;
}
static int liveCount() {
//return number of running (live) threads
return num_running();
Expand All @@ -790,7 +803,7 @@ class GThread {
_GTHREADS_DISABLE_ASSIGNMENT(GThread)

private:
void initStart(void* tidata);
void initStart(void* tidata, size_t stacksize=0);
static void update_counter(int inc=1, GThread* t_update=NULL); //default: increments
// This is the internal thread wrapper function.
#if defined(_GTHREADS_WIN32_)
Expand Down
15 changes: 13 additions & 2 deletions stringtie.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ const char* ERR_BAM_SORT="\nError: the input alignment file is not sorted!\n";
if (ballgown)
Ballgown_setupFiles(f_tdata, f_edata, f_idata, f_e2t, f_i2t);
#ifndef NOTHREADS
#define DEF_TSTACK_SIZE 8388608
int tstackSize=GThread::defaultStackSize();
size_t defStackSize=0;
if (tstackSize<DEF_TSTACK_SIZE) defStackSize=DEF_TSTACK_SIZE;
if (verbose) {
if (defStackSize>0){
int ssize=defStackSize;
GMessage("Default stack size for threads: %d (increased to %d)\n", tstackSize, ssize);
}
else GMessage("Default stack size for threads: %d\n", tstackSize);
}
GThread* threads=new GThread[num_cpus]; //bundle processing threads

GPVec<BundleData> bundleQueue(false); //queue of loaded bundles
Expand All @@ -394,7 +405,7 @@ if (ballgown)

dataClear.setCapacity(num_cpus+1);
for (int b=0;b<num_cpus;b++) {
threads[b].kickStart(workerThread, (void*) &bundleQueue);
threads[b].kickStart(workerThread, (void*) &bundleQueue, defStackSize);
bundles[b+1].idx=b+1;
dataClear.Push(b);
}
Expand Down Expand Up @@ -1230,8 +1241,8 @@ void processBundle(BundleData* bundle) {
NumFrag3+=bundle->num_fragments3;
SumFrag3+=bundle->sum_fragments3;
fprintf(stderr,"Number of fragments in bundle: %g with length %g\n",bundle->num_fragments,bundle->frag_len);
*/
fprintf(stderr,"Number of fragments in bundle: %g with sum %g\n",bundle->num_fragments,bundle->frag_len);
*/
GMessage("^bundle %s:%d-%d(%d) done (%d processed potential transcripts).\n",bundle->refseq.chars(),
bundle->start, bundle->end, bundle->readlist.Count(), bundle->pred.Count());
#ifdef GMEMTRACE
Expand Down

0 comments on commit e94679a

Please sign in to comment.