diff --git a/ThreadPool.h b/ThreadPool.h deleted file mode 100644 index 4183203..0000000 --- a/ThreadPool.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef THREAD_POOL_H -#define THREAD_POOL_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -class ThreadPool { -public: - ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); -private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; - // the task queue - std::queue< std::function > tasks; - - // synchronization - std::mutex queue_mutex; - std::condition_variable condition; - bool stop; -}; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) -{ - for(size_t i = 0;i task; - - { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } - - task(); - } - } - ); -} - -// add new work item to the pool -template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> -{ - using return_type = typename std::result_of::type; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - - tasks.emplace([task](){ (*task)(); }); - } - condition.notify_one(); - return res; -} - -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ - { - std::unique_lock lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); -} - -#endif diff --git a/ThreadPool.hpp b/ThreadPool.hpp new file mode 100644 index 0000000..0ce8093 --- /dev/null +++ b/ThreadPool.hpp @@ -0,0 +1,118 @@ +/* +Copyright (c) 2012 Jakob Progsch, Václav Zeman + +This software is provided 'as-is', without any express or implied +warranty. In no event will the authors be held liable for any damages +arising from the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it +freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + + 3. This notice may not be removed or altered from any source + distribution. +*/ + +#ifndef THREAD_POOL_HPP +#define THREAD_POOL_HPP + +// containers +#include +#include +// threading +#include +#include +#include +#include +#include +// utility wrappers +#include +#include +// exceptions +#include + +// std::thread pool for resources recycling +class ThreadPool { +public: + // the constructor just launches some amount of workers + ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false) + { + if(!threads_n) + throw std::invalid_argument("more than zero threads expected"); + + this->workers.reserve(threads_n); + for(; threads_n; --threads_n) + this->workers.emplace_back( + [this] + { + while(true) + { + std::function task; + + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, + [this]{ return this->stop || !this->tasks.empty(); }); + if(this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(); + } + } + ); + } + // deleted copy&move ctors&assignments + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; + // add new work item to the pool + template + std::future::type> enqueue(F&& f, Args&&... args) + { + using packaged_task_t = std::packaged_task::type ()>; + + std::shared_ptr task(new packaged_task_t( + std::bind(std::forward(f), std::forward(args)...) + )); + auto res = task->get_future(); + { + std::unique_lock lock(this->queue_mutex); + this->tasks.emplace([task](){ (*task)(); }); + } + this->condition.notify_one(); + return res; + } + // the destructor joins all threads + virtual ~ThreadPool() + { + this->stop = true; + this->condition.notify_all(); + for(std::thread& worker : this->workers) + worker.join(); + } +private: + // need to keep track of threads so we can join them + std::vector< std::thread > workers; + // the task queue + std::queue< std::function > tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + // workers finalization flag + std::atomic_bool stop; +}; + +#endif // THREAD_POOL_HPP diff --git a/example.cpp b/example.cpp index 837277b..c3a65aa 100644 --- a/example.cpp +++ b/example.cpp @@ -2,7 +2,7 @@ #include #include -#include "ThreadPool.h" +#include "ThreadPool.hpp" int main() {