Skip to content

Commit 374d058

Browse files
authored
ThreadPool: optional limit for jobs queue (yhirose#1741)
For very busy servers, the internal jobs queue where accepted sockets are enqueued can grow without limit. This is a problem for two reasons: - queueing too much work causes the server to respond with huge latency, resulting in repetead timeouts on the clients; it is definitely better to reject the connection early, so that the client receives the backpressure signal as soon as the queue is becoming too large - the jobs list can eventually cause an out of memory condition
1 parent 31cdcc3 commit 374d058

File tree

3 files changed

+118
-10
lines changed

3 files changed

+118
-10
lines changed

README.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,17 @@ If you want to set the thread count at runtime, there is no convenient way... Bu
433433
svr.new_task_queue = [] { return new ThreadPool(12); };
434434
```
435435
436+
You can also provide an optional parameter to limit the maximum number
437+
of pending requests, i.e. requests `accept()`ed by the listener but
438+
still waiting to be serviced by worker threads.
439+
440+
```cpp
441+
svr.new_task_queue = [] { return new ThreadPool(/*num_threads=*/12, /*max_queued_requests=*/18); };
442+
```
443+
444+
Default limit is 0 (unlimited). Once the limit is reached, the listener
445+
will shutdown the client connection.
446+
436447
### Override the default thread pool with yours
437448

438449
You can supply your own thread pool implementation according to your need.
@@ -444,8 +455,10 @@ public:
444455
pool_.start_with_thread_count(n);
445456
}
446457

447-
virtual void enqueue(std::function<void()> fn) override {
448-
pool_.enqueue(fn);
458+
virtual bool enqueue(std::function<void()> fn) override {
459+
/* Return true if the task was actually enqueued, or false
460+
* if the caller must drop the corresponding connection. */
461+
return pool_.enqueue(fn);
449462
}
450463

451464
virtual void shutdown() override {

httplib.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,15 +653,16 @@ class TaskQueue {
653653
TaskQueue() = default;
654654
virtual ~TaskQueue() = default;
655655

656-
virtual void enqueue(std::function<void()> fn) = 0;
656+
virtual bool enqueue(std::function<void()> fn) = 0;
657657
virtual void shutdown() = 0;
658658

659659
virtual void on_idle() {}
660660
};
661661

662662
class ThreadPool : public TaskQueue {
663663
public:
664-
explicit ThreadPool(size_t n) : shutdown_(false) {
664+
explicit ThreadPool(size_t n, size_t mqr = 0)
665+
: shutdown_(false), max_queued_requests_(mqr) {
665666
while (n) {
666667
threads_.emplace_back(worker(*this));
667668
n--;
@@ -671,13 +672,17 @@ class ThreadPool : public TaskQueue {
671672
ThreadPool(const ThreadPool &) = delete;
672673
~ThreadPool() override = default;
673674

674-
void enqueue(std::function<void()> fn) override {
675+
bool enqueue(std::function<void()> fn) override {
675676
{
676677
std::unique_lock<std::mutex> lock(mutex_);
678+
if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
679+
return false;
680+
}
677681
jobs_.push_back(std::move(fn));
678682
}
679683

680684
cond_.notify_one();
685+
return true;
681686
}
682687

683688
void shutdown() override {
@@ -727,6 +732,7 @@ class ThreadPool : public TaskQueue {
727732
std::list<std::function<void()>> jobs_;
728733

729734
bool shutdown_;
735+
size_t max_queued_requests_ = 0;
730736

731737
std::condition_variable cond_;
732738
std::mutex mutex_;
@@ -6319,7 +6325,11 @@ inline bool Server::listen_internal() {
63196325
#endif
63206326
}
63216327

6322-
task_queue->enqueue([this, sock]() { process_and_close_socket(sock); });
6328+
if (!task_queue->enqueue(
6329+
[this, sock]() { process_and_close_socket(sock); })) {
6330+
detail::shutdown_socket(sock);
6331+
detail::close_socket(sock);
6332+
}
63236333
}
63246334

63256335
task_queue->shutdown();

test/test.cc

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6511,18 +6511,103 @@ TEST(SocketStream, is_writable_INET) {
65116511
#endif // #ifndef _WIN32
65126512

65136513
TEST(TaskQueueTest, IncreaseAtomicInteger) {
6514-
static constexpr unsigned int number_of_task{1000000};
6514+
static constexpr unsigned int number_of_tasks{1000000};
65156515
std::atomic_uint count{0};
65166516
std::unique_ptr<TaskQueue> task_queue{
65176517
new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}};
65186518

6519-
for (unsigned int i = 0; i < number_of_task; ++i) {
6520-
task_queue->enqueue(
6519+
for (unsigned int i = 0; i < number_of_tasks; ++i) {
6520+
auto queued = task_queue->enqueue(
65216521
[&count] { count.fetch_add(1, std::memory_order_relaxed); });
6522+
EXPECT_TRUE(queued);
6523+
}
6524+
6525+
EXPECT_NO_THROW(task_queue->shutdown());
6526+
EXPECT_EQ(number_of_tasks, count.load());
6527+
}
6528+
6529+
TEST(TaskQueueTest, IncreaseAtomicIntegerWithQueueLimit) {
6530+
static constexpr unsigned int number_of_tasks{1000000};
6531+
static constexpr unsigned int qlimit{2};
6532+
unsigned int queued_count{0};
6533+
std::atomic_uint count{0};
6534+
std::unique_ptr<TaskQueue> task_queue{
6535+
new ThreadPool{/*num_threads=*/1, qlimit}};
6536+
6537+
for (unsigned int i = 0; i < number_of_tasks; ++i) {
6538+
if (task_queue->enqueue(
6539+
[&count] { count.fetch_add(1, std::memory_order_relaxed); })) {
6540+
queued_count++;
6541+
}
6542+
}
6543+
6544+
EXPECT_NO_THROW(task_queue->shutdown());
6545+
EXPECT_EQ(queued_count, count.load());
6546+
EXPECT_TRUE(queued_count <= number_of_tasks);
6547+
EXPECT_TRUE(queued_count >= qlimit);
6548+
}
6549+
6550+
TEST(TaskQueueTest, MaxQueuedRequests) {
6551+
static constexpr unsigned int qlimit{3};
6552+
std::unique_ptr<TaskQueue> task_queue{new ThreadPool{1, qlimit}};
6553+
std::condition_variable sem_cv;
6554+
std::mutex sem_mtx;
6555+
int credits = 0;
6556+
bool queued;
6557+
6558+
/* Fill up the queue with tasks that will block until we give them credits to
6559+
* complete. */
6560+
for (unsigned int n = 0; n <= qlimit;) {
6561+
queued = task_queue->enqueue([&sem_mtx, &sem_cv, &credits] {
6562+
std::unique_lock<std::mutex> lock(sem_mtx);
6563+
while (credits <= 0) {
6564+
sem_cv.wait(lock);
6565+
}
6566+
/* Consume the credit and signal the test code if they are all gone. */
6567+
if (--credits == 0) { sem_cv.notify_one(); }
6568+
});
6569+
6570+
if (n < qlimit) {
6571+
/* The first qlimit enqueues must succeed. */
6572+
EXPECT_TRUE(queued);
6573+
} else {
6574+
/* The last one will succeed only when the worker thread
6575+
* starts and dequeues the first blocking task. Although
6576+
* not necessary for the correctness of this test, we sleep for
6577+
* a short while to avoid busy waiting. */
6578+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
6579+
}
6580+
if (queued) { n++; }
6581+
}
6582+
6583+
/* Further enqueues must fail since the queue is full. */
6584+
for (auto i = 0; i < 4; i++) {
6585+
queued = task_queue->enqueue([] {});
6586+
EXPECT_FALSE(queued);
6587+
}
6588+
6589+
/* Give the credits to allow the previous tasks to complete. */
6590+
{
6591+
std::unique_lock<std::mutex> lock(sem_mtx);
6592+
credits += qlimit + 1;
6593+
}
6594+
sem_cv.notify_all();
6595+
6596+
/* Wait for all the credits to be consumed. */
6597+
{
6598+
std::unique_lock<std::mutex> lock(sem_mtx);
6599+
while (credits > 0) {
6600+
sem_cv.wait(lock);
6601+
}
6602+
}
6603+
6604+
/* Check that we are able again to enqueue at least qlimit tasks. */
6605+
for (unsigned int i = 0; i < qlimit; i++) {
6606+
queued = task_queue->enqueue([] {});
6607+
EXPECT_TRUE(queued);
65226608
}
65236609

65246610
EXPECT_NO_THROW(task_queue->shutdown());
6525-
EXPECT_EQ(number_of_task, count.load());
65266611
}
65276612

65286613
TEST(RedirectTest, RedirectToUrlWithQueryParameters) {

0 commit comments

Comments
 (0)