diff --git a/ThreadPool.cpp b/ThreadPool.cpp new file mode 100644 index 0000000..393c722 --- /dev/null +++ b/ThreadPool.cpp @@ -0,0 +1,62 @@ +// +// Purpose: Simple thread pool +// +// Based on https://github.com/progschj/ThreadPool changes provided as https://github.com/calthron/ThreadPool + +#include "ThreadPool.hpp" + +ThreadPool::ThreadPool (size_t threads) +{ + workers.reserve (threads); + + for (size_t count = 0; count < threads; ++count) + { + // Worker execution loop + workers.emplace_back ([this]() + { + for (;;) + { + // Task to execute + std::function task; + + // Wait for additional work signal + { // CRITICAL SECTION + // Wait to be notified of work + lock_t lock (queue_mutex); + condition.wait (lock, [this]() + { + return stop || !tasks.empty (); + }); + + // If stopping and no work remains, exit the work loop + if (stop && tasks.empty ()) + break; + + // Dequeue the next task + task.swap (tasks.front ()); + tasks.pop (); + } // END CRITICAL SECTION + + // Execute + task (); + } + }); + } +} + +// Destructor joins all worker threads +ThreadPool::~ThreadPool () +{ + { // Critical section + lock_t lock (queue_mutex); + stop = true; + } // End critical section + + condition.notify_all (); + + // Wait for threads to complete work + for (std::thread &worker : workers) + { + worker.join(); + } +} diff --git a/ThreadPool.h b/ThreadPool.h deleted file mode 100644 index 4183203..0000000 --- a/ThreadPool.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef THREAD_POOL_H -#define THREAD_POOL_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -class ThreadPool { -public: - ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); -private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; - // the task queue - std::queue< std::function > tasks; - - // synchronization - std::mutex queue_mutex; - std::condition_variable condition; - bool stop; -}; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) -{ - for(size_t i = 0;i task; - - { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } - - task(); - } - } - ); -} - -// add new work item to the pool -template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> -{ - using return_type = typename std::result_of::type; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - - tasks.emplace([task](){ (*task)(); }); - } - condition.notify_one(); - return res; -} - -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ - { - std::unique_lock lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); -} - -#endif diff --git a/ThreadPool.hpp b/ThreadPool.hpp new file mode 100644 index 0000000..74693fa --- /dev/null +++ b/ThreadPool.hpp @@ -0,0 +1,76 @@ +// +// Purpose: Simple thread pool +// +// Based on https://github.com/progschj/ThreadPool changes provided as https://github.com/calthron/ThreadPool + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class ThreadPool final +{ + public: + // Launches specified number of worker threads + ThreadPool (size_t threads = 1); + ~ThreadPool (); + + // Not copyable + ThreadPool (const ThreadPool &) = delete; + ThreadPool& operator= (const ThreadPool &) = delete; + + // Not moveable + ThreadPool (ThreadPool &&) = delete; + ThreadPool& operator= (const ThreadPool &&) = delete; + + // Enqueue task and return std::future<> + template + auto enqueue (Callable&& callable, Args&&... args) + -> std::future::type>; + + private: + // Keep track of threads, so they can be joined + std::vector workers; + // Task queue + std::queue> tasks; + + // Synchronization + using lock_t = std::unique_lock; + std::mutex queue_mutex; + std::condition_variable condition; + bool stop = false; +}; + +// Add a new work item to the pool, return std::future of return type +template +auto ThreadPool::enqueue (Callable&& callable, Args&&... args) + -> std::future::type> +{ + using return_t = typename std::result_of::type; + using task_t = std::packaged_task; + + auto task = std::make_shared (std::bind (std::forward (callable), std::forward (args)...)); + std::future result = task->get_future(); + + { // Critical section + lock_t lock (queue_mutex); + + // Don't allow an enqueue after stopping + if (stop) + throw std::runtime_error ("enqueue on stopped ThreadPool"); + + // Push work back on the queue + tasks.emplace ([task](){ (*task)(); }); + } // End critical section + + // Notify a thread that there is new work to perform + condition.notify_one (); + return result; +}