From 36f1a49abc37173cc93aeeb6c7d3971085e1d89a Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 17:01:02 +0200 Subject: [PATCH 1/9] changed ThreadPool header file extension to .hpp Time to leave the past of C with objects and making a difference between these languages by using the appropriated C++ file extension --- ThreadPool.h => ThreadPool.hpp | 4 ++-- example.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename ThreadPool.h => ThreadPool.hpp (98%) diff --git a/ThreadPool.h b/ThreadPool.hpp similarity index 98% rename from ThreadPool.h rename to ThreadPool.hpp index 4183203..6579fea 100644 --- a/ThreadPool.h +++ b/ThreadPool.hpp @@ -1,5 +1,5 @@ -#ifndef THREAD_POOL_H -#define THREAD_POOL_H +#ifndef THREAD_POOL_HPP +#define THREAD_POOL_HPP #include #include diff --git a/example.cpp b/example.cpp index 837277b..c3a65aa 100644 --- a/example.cpp +++ b/example.cpp @@ -2,7 +2,7 @@ #include #include -#include "ThreadPool.h" +#include "ThreadPool.hpp" int main() { From 6ccbb6590706a8d2958e9867a700e7e28bfe9b65 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 17:16:16 +0200 Subject: [PATCH 2/9] reduced redundancy Merged definitions into class and simplified enqueue return type. Everyone uses editors with folding and lowering compile time is something worth. (Ctor & dtor in class gives the inline hint to compilers as well) --- ThreadPool.hpp | 130 +++++++++++++++++++++++-------------------------- 1 file changed, 60 insertions(+), 70 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 6579fea..4903a81 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -13,86 +13,76 @@ class ThreadPool { public: - ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); -private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; - // the task queue - std::queue< std::function > tasks; - - // synchronization - std::mutex queue_mutex; - std::condition_variable condition; - bool stop; -}; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) -{ - for(size_t i = 0;i task; - + for(;;) { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } + std::function task; - task(); + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, + [this]{ return this->stop || !this->tasks.empty(); }); + if(this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(); + } } - } - ); -} + ); + } + // add new work item to the pool + template + std::future::type> enqueue(F&& f, Args&&... args) + { + using return_type = typename std::result_of::type; -// add new work item to the pool -template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> -{ - using return_type = typename std::result_of::type; + auto task = std::make_shared< std::packaged_task >( + std::bind(std::forward(f), std::forward(args)...) + ); - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); + // don't allow enqueueing after stopping the pool + if(stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); - tasks.emplace([task](){ (*task)(); }); + tasks.emplace([task](){ (*task)(); }); + } + condition.notify_one(); + return res; } - condition.notify_one(); - return res; -} - -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ + // the destructor joins all threads + ~ThreadPool() { - std::unique_lock lock(queue_mutex); - stop = true; + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for(std::thread &worker: workers) + worker.join(); } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); -} +private: + // need to keep track of threads so we can join them + std::vector< std::thread > workers; + // the task queue + std::queue< std::function > tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; #endif From a7d611f2c08383e20008f564d3085b279cadba9d Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 17:26:40 +0200 Subject: [PATCH 3/9] changed stop flag to atomic type Atomics are cheaper for thread safety --- ThreadPool.hpp | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 4903a81..9562cee 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,10 @@ class ThreadPool { template std::future::type> enqueue(F&& f, Args&&... args) { + // don't allow enqueueing after stopping the pool + if(stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + using return_type = typename std::result_of::type; auto task = std::make_shared< std::packaged_task >( @@ -52,11 +57,6 @@ class ThreadPool { std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); @@ -65,10 +65,7 @@ class ThreadPool { // the destructor joins all threads ~ThreadPool() { - { - std::unique_lock lock(queue_mutex); - stop = true; - } + stop = true; condition.notify_all(); for(std::thread &worker: workers) worker.join(); @@ -82,7 +79,7 @@ class ThreadPool { // synchronization std::mutex queue_mutex; std::condition_variable condition; - bool stop; + std::atomic_bool stop; }; #endif From 0a0f766129d9e933a64ca834838fec066bc14462 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 17:50:39 +0200 Subject: [PATCH 4/9] slighly improved ctor & enqueue performance * Pre-allocated workers memory * Simplified creation loop from upcounter to countdown * Build task in-place instead of an additional assignment --- ThreadPool.hpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 9562cee..7d14c3d 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -17,7 +17,8 @@ class ThreadPool { // the constructor just launches some amount of workers ThreadPool(size_t threads) : stop(false) { - for(size_t i = 0;i::type; + using packaged_task_t = std::packaged_task::type ()>; - auto task = std::make_shared< std::packaged_task >( + std::shared_ptr task(new packaged_task_t( std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); + )); + auto res = task->get_future(); { std::unique_lock lock(queue_mutex); tasks.emplace([task](){ (*task)(); }); From 180241dc786fe912ecc1865281e1dc69c3e5d198 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 18:05:02 +0200 Subject: [PATCH 5/9] improved code readability * Added this-> to members to make scope more clear * Renamed ctor argument threads -> threads_n (which makes more sense) * Simplified for(;;) to while(true) which is more common and less optimizer work --- ThreadPool.hpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 7d14c3d..87fc019 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -15,14 +15,14 @@ class ThreadPool { public: // the constructor just launches some amount of workers - ThreadPool(size_t threads) : stop(false) + ThreadPool(size_t threads_n) : stop(false) { - workers.reserve(threads); - for(; threads; --threads) - workers.emplace_back( + this->workers.reserve(threads_n); + for(; threads_n; --threads_n) + this->workers.emplace_back( [this] { - for(;;) + while(true) { std::function task; @@ -46,7 +46,7 @@ class ThreadPool { std::future::type> enqueue(F&& f, Args&&... args) { // don't allow enqueueing after stopping the pool - if(stop) + if(this->stop) throw std::runtime_error("enqueue on stopped ThreadPool"); using packaged_task_t = std::packaged_task::type ()>; @@ -56,18 +56,18 @@ class ThreadPool { )); auto res = task->get_future(); { - std::unique_lock lock(queue_mutex); - tasks.emplace([task](){ (*task)(); }); + std::unique_lock lock(this->queue_mutex); + this->tasks.emplace([task](){ (*task)(); }); } - condition.notify_one(); + this->condition.notify_one(); return res; } // the destructor joins all threads ~ThreadPool() { - stop = true; - condition.notify_all(); - for(std::thread &worker: workers) + this->stop = true; + this->condition.notify_all(); + for(std::thread& worker : this->workers) worker.join(); } private: From 3393a0586f3d163438055993cda5b96ab29d9a70 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 18:10:14 +0200 Subject: [PATCH 6/9] added optional ctor argument Why not trying what the OS recommends? --- ThreadPool.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 87fc019..528641e 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -15,7 +15,7 @@ class ThreadPool { public: // the constructor just launches some amount of workers - ThreadPool(size_t threads_n) : stop(false) + ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false) { this->workers.reserve(threads_n); for(; threads_n; --threads_n) From 6ba7e87f2dc83ffa48f801d7241d512bdb260f8e Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 18:19:12 +0200 Subject: [PATCH 7/9] improved arguments check When an equeue happens during the pool destruction, the user did something terribly wrong! Zero threads are possible but result in an useless pool. There aren't plans for dummy pools, right? --- ThreadPool.hpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 528641e..e58549b 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -17,6 +17,9 @@ class ThreadPool { // the constructor just launches some amount of workers ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false) { + if(!threads_n) + throw std::invalid_argument("more than zero threads expected"); + this->workers.reserve(threads_n); for(; threads_n; --threads_n) this->workers.emplace_back( @@ -45,10 +48,6 @@ class ThreadPool { template std::future::type> enqueue(F&& f, Args&&... args) { - // don't allow enqueueing after stopping the pool - if(this->stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - using packaged_task_t = std::packaged_task::type ()>; std::shared_ptr task(new packaged_task_t( From bb393fe75b37b833923bccc3bea34b4da4466414 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 18:34:53 +0200 Subject: [PATCH 8/9] fulfill rule-of-5 Deleted copy&move ctors&assignments because of limitations by attributes (sync objects have nothing of both). Set dtor virtual for polymorphism - ThreadPool has attributes which need to get their dtors called too and running threads shouldn't get lost. --- ThreadPool.hpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index e58549b..55d08d5 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -44,6 +44,10 @@ class ThreadPool { } ); } + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; // add new work item to the pool template std::future::type> enqueue(F&& f, Args&&... args) @@ -62,7 +66,7 @@ class ThreadPool { return res; } // the destructor joins all threads - ~ThreadPool() + virtual ~ThreadPool() { this->stop = true; this->condition.notify_all(); From 51737c36b88900cc41edf2e88398c58bb4d02f85 Mon Sep 17 00:00:00 2001 From: Youka Date: Fri, 10 Jul 2015 18:49:10 +0200 Subject: [PATCH 9/9] added comments & license in source Storing the license in source files too is safer. --- ThreadPool.hpp | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 55d08d5..0ce8093 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -1,17 +1,45 @@ +/* +Copyright (c) 2012 Jakob Progsch, Václav Zeman + +This software is provided 'as-is', without any express or implied +warranty. In no event will the authors be held liable for any damages +arising from the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it +freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + + 3. This notice may not be removed or altered from any source + distribution. +*/ + #ifndef THREAD_POOL_HPP #define THREAD_POOL_HPP +// containers #include #include -#include +// threading #include #include #include #include #include +// utility wrappers +#include #include +// exceptions #include +// std::thread pool for resources recycling class ThreadPool { public: // the constructor just launches some amount of workers @@ -44,6 +72,7 @@ class ThreadPool { } ); } + // deleted copy&move ctors&assignments ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool(ThreadPool&&) = delete; @@ -82,7 +111,8 @@ class ThreadPool { // synchronization std::mutex queue_mutex; std::condition_variable condition; + // workers finalization flag std::atomic_bool stop; }; -#endif +#endif // THREAD_POOL_HPP