From 20151987e57120b77a0db9fc1fa64ea582f8c9c4 Mon Sep 17 00:00:00 2001 From: HO-COOH <42881734+HO-COOH@users.noreply.github.com> Date: Tue, 25 Feb 2020 20:12:39 -0600 Subject: [PATCH 1/4] C++17 -ify --- ThreadPool.h | 79 +++++++++++++++++++++++++++------------------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 4183203..4683277 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -14,70 +14,51 @@ class ThreadPool { public: ThreadPool(size_t); + template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; + decltype(auto) enqueue(F&& f, Args&&... args); + ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; + void newWorker(); // the task queue std::queue< std::function > tasks; - + // synchronization std::mutex queue_mutex; std::condition_variable condition; - bool stop; + bool stop = false; }; - + // the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) +inline ThreadPool::ThreadPool(size_t threads=std::thread::hardware_concurrency()) { - for(size_t i = 0;i task; - - { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } - - task(); - } - } - ); + workers.reserve(threads); + for (size_t i = 0; i < threads; ++i) + newWorker(); } // add new work item to the pool template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> +decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) { - using return_type = typename std::result_of::type; + using return_type = std::invoke_result_t; auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) + std::bind(std::forward(f), std::forward(args)...) ); - + std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); // don't allow enqueueing after stopping the pool - if(stop) + if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); - tasks.emplace([task](){ (*task)(); }); + tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; @@ -87,12 +68,34 @@ auto ThreadPool::enqueue(F&& f, Args&&... args) inline ThreadPool::~ThreadPool() { { - std::unique_lock lock(queue_mutex); + std::lock_guard lock{ queue_mutex }; stop = true; } condition.notify_all(); - for(std::thread &worker: workers) + for (auto& worker : workers) worker.join(); } +inline void ThreadPool::newWorker() +{ + workers.emplace_back( + [this] + { + for (;;) + { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + task(); + } + } + ); +} + #endif From 531e7bcbf9e39d6323a11235ee3dbe7113e09abe Mon Sep 17 00:00:00 2001 From: HO-COOH <42881734+HO-COOH@users.noreply.github.com> Date: Wed, 26 Feb 2020 00:46:33 -0600 Subject: [PATCH 2/4] Add maximum queue size and related sychronization --- ThreadPool.h | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 4683277..004bc2c 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -10,7 +10,7 @@ #include #include #include - +constexpr size_t maxQueueSize = 100; class ThreadPool { public: ThreadPool(size_t); @@ -19,14 +19,14 @@ class ThreadPool { decltype(auto) enqueue(F&& f, Args&&... args); ~ThreadPool(); + private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; + std::vector workers; + std::queue> tasks; void newWorker(); - // the task queue - std::queue< std::function > tasks; - // synchronization + std::condition_variable tasks_full_flag; //block enqueue if tasks queue is full + // synchronization (task queue <-> workers) std::mutex queue_mutex; std::condition_variable condition; bool stop = false; @@ -58,6 +58,9 @@ decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); + if (tasks.size() >= maxQueueSize) + tasks_full_flag.wait(lock, [&]() {return tasks.size() < maxQueueSize; }); + tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); @@ -78,19 +81,21 @@ inline ThreadPool::~ThreadPool() inline void ThreadPool::newWorker() { + auto& flag = tasks_full_flag; workers.emplace_back( - [this] + [this, &flag] { for (;;) { std::function task; { std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); + this->condition.wait(lock, [&] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); + flag.notify_one(); } task(); } From 9c884a262071d5226ff1c0d87bbccc373fbc3eba Mon Sep 17 00:00:00 2001 From: HO-COOH <42881734+HO-COOH@users.noreply.github.com> Date: Tue, 26 Jan 2021 21:05:01 -0600 Subject: [PATCH 3/4] C++20 ify --- ThreadPool.h | 132 ++++++++++++++++++++++----------------------------- 1 file changed, 56 insertions(+), 76 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 004bc2c..4ca0d6b 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -11,96 +11,76 @@ #include #include constexpr size_t maxQueueSize = 100; -class ThreadPool { -public: - ThreadPool(size_t); - - template - decltype(auto) enqueue(F&& f, Args&&... args); - - ~ThreadPool(); - -private: - std::vector workers; - std::queue> tasks; - void newWorker(); - - std::condition_variable tasks_full_flag; //block enqueue if tasks queue is full - // synchronization (task queue <-> workers) - std::mutex queue_mutex; - std::condition_variable condition; - bool stop = false; -}; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads=std::thread::hardware_concurrency()) -{ - workers.reserve(threads); - for (size_t i = 0; i < threads; ++i) - newWorker(); -} -// add new work item to the pool -template -decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) +class ThreadPool { - using return_type = std::invoke_result_t; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); +public: + ThreadPool(size_t threads = std::thread::hardware_concurrency() - 1) + { + workers.reserve(threads); + for (size_t i{}; i < threads; ++i) + makeThread(); + } - std::future res = task->get_future(); + template + auto enqueue(F &&f, Args &&... args) { - std::unique_lock lock(queue_mutex); + using return_type = std::invoke_result_t; + //...v This need C++20! + std::packaged_task task{[f = std::forward(f), ... args = std::forward(args)]() mutable { return f(std::forward(args)...); }}; + auto result = task.get_future(); + { + std::unique_lock lock(queue_mutex); - // don't allow enqueueing after stopping the pool - if (stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); - if (tasks.size() >= maxQueueSize) - tasks_full_flag.wait(lock, [&]() {return tasks.size() < maxQueueSize; }); + if (tasks.size() >= maxQueueSize) + queueStatus.wait(lock, [&]() { return tasks.size() < maxQueueSize; }); - tasks.emplace([task]() { (*task)(); }); + tasks.emplace(std::move(task));//No need for std::shared_ptr anymore! + } + hasNewTask.notify_one(); + return result; } - condition.notify_one(); - return res; -} -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ + ~ThreadPool() { - std::lock_guard lock{ queue_mutex }; stop = true; + hasNewTask.notify_all(); + for (auto &worker : workers) + worker.join(); } - condition.notify_all(); - for (auto& worker : workers) - worker.join(); -} -inline void ThreadPool::newWorker() -{ - auto& flag = tasks_full_flag; - workers.emplace_back( - [this, &flag] - { - for (;;) - { - std::function task; +private: + std::vector workers; + std::queue> tasks; + std::mutex queue_mutex; + std::condition_variable queueStatus; + std::condition_variable hasNewTask; + std::atomic_bool stop{false}; + + void makeThread() + { + workers.emplace_back( + [this] { + while (true) { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, [&] { return this->stop || !this->tasks.empty(); }); - if (this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - flag.notify_one(); + std::packaged_task task; + { + std::unique_lock lock(this->queue_mutex); + this->hasNewTask.wait(lock, [&] { return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + queueStatus.notify_one(); + } + task(); } - task(); - } - } - ); -} + }); + } +}; #endif From d4308de74c745ebcec2a9046d08355f5b84c8916 Mon Sep 17 00:00:00 2001 From: HO-COOH <42881734+HO-COOH@users.noreply.github.com> Date: Mon, 1 Feb 2021 23:31:15 -0600 Subject: [PATCH 4/4] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index be3d25a..4c771f5 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ ThreadPool ========== -A simple C++11 Thread Pool implementation. +A simple C++20 Thread Pool implementation. Basic usage: ```c++ // create thread pool with 4 worker threads -ThreadPool pool(4); +ThreadPool pool{4}; // enqueue and store future auto result = pool.enqueue([](int answer) { return answer; }, 42);