mirror of
https://github.com/yhirose/cpp-httplib.git
synced 2026-04-11 19:28:30 +00:00
* WebSocket support * Validate selected subprotocol in WebSocket handshake * Fix problem with a Unit test * Dynamic Thread Pool support * Fix race condition in new Dynamic ThreadPool
229 lines
5.5 KiB
C++
229 lines
5.5 KiB
C++
// ThreadPool unit tests
|
|
// Set a short idle timeout for faster shrink tests
|
|
#define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1
|
|
|
|
#include <httplib.h>
|
|
|
|
#include <gtest/gtest.h>
|
|
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
using namespace httplib;
|
|
|
|
TEST(ThreadPoolTest, BasicTaskExecution) {
|
|
ThreadPool pool(4);
|
|
std::atomic<int> count(0);
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
pool.enqueue([&count]() { count++; });
|
|
}
|
|
|
|
pool.shutdown();
|
|
EXPECT_EQ(10, count.load());
|
|
}
|
|
|
|
TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) {
|
|
// max_n == 0 means max = base (fixed pool behavior)
|
|
ThreadPool pool(4);
|
|
std::atomic<int> count(0);
|
|
|
|
for (int i = 0; i < 100; i++) {
|
|
pool.enqueue([&count]() { count++; });
|
|
}
|
|
|
|
pool.shutdown();
|
|
EXPECT_EQ(100, count.load());
|
|
}
|
|
|
|
TEST(ThreadPoolTest, DynamicScaleUp) {
|
|
// base=2, max=8: block 2 base threads, then enqueue more tasks
|
|
ThreadPool pool(2, 8);
|
|
|
|
std::atomic<int> active(0);
|
|
std::atomic<int> max_active(0);
|
|
std::atomic<int> completed(0);
|
|
std::mutex barrier_mutex;
|
|
std::condition_variable barrier_cv;
|
|
bool release = false;
|
|
|
|
// Occupy all base threads with blocking tasks
|
|
for (int i = 0; i < 2; i++) {
|
|
pool.enqueue([&]() {
|
|
active++;
|
|
{
|
|
std::unique_lock<std::mutex> lock(barrier_mutex);
|
|
barrier_cv.wait(lock, [&] { return release; });
|
|
}
|
|
active--;
|
|
completed++;
|
|
});
|
|
}
|
|
|
|
// Wait for base threads to be occupied
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
// These should trigger dynamic thread creation
|
|
for (int i = 0; i < 4; i++) {
|
|
pool.enqueue([&]() {
|
|
int cur = ++active;
|
|
// Track peak active count
|
|
int prev = max_active.load();
|
|
while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {}
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
active--;
|
|
completed++;
|
|
});
|
|
}
|
|
|
|
// Wait for dynamic tasks to complete
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
|
|
|
// Release the blocking tasks
|
|
{
|
|
std::unique_lock<std::mutex> lock(barrier_mutex);
|
|
release = true;
|
|
}
|
|
barrier_cv.notify_all();
|
|
|
|
pool.shutdown();
|
|
EXPECT_EQ(6, completed.load());
|
|
// More than 2 threads were active simultaneously
|
|
EXPECT_GT(max_active.load(), 2);
|
|
}
|
|
|
|
TEST(ThreadPoolTest, DynamicShrinkAfterIdle) {
|
|
// CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second
|
|
ThreadPool pool(2, 8);
|
|
|
|
std::atomic<int> completed(0);
|
|
|
|
// Enqueue tasks that require dynamic threads
|
|
for (int i = 0; i < 8; i++) {
|
|
pool.enqueue([&]() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
completed++;
|
|
});
|
|
}
|
|
|
|
// Wait for all tasks to complete + idle timeout + margin
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
|
|
// Now enqueue a simple task to verify the pool still works
|
|
// (base threads are still alive)
|
|
std::atomic<bool> final_task_done(false);
|
|
pool.enqueue([&]() { final_task_done = true; });
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
pool.shutdown();
|
|
EXPECT_EQ(8, completed.load());
|
|
EXPECT_TRUE(final_task_done.load());
|
|
}
|
|
|
|
TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) {
|
|
ThreadPool pool(2, 8);
|
|
|
|
std::atomic<int> started(0);
|
|
|
|
std::mutex block_mutex;
|
|
std::condition_variable block_cv;
|
|
bool release = false;
|
|
|
|
// Start tasks on dynamic threads that block until released
|
|
for (int i = 0; i < 6; i++) {
|
|
pool.enqueue([&]() {
|
|
started++;
|
|
std::unique_lock<std::mutex> lock(block_mutex);
|
|
block_cv.wait(lock, [&] { return release; });
|
|
});
|
|
}
|
|
|
|
// Wait for tasks to start
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
|
EXPECT_GE(started.load(), 2);
|
|
|
|
// Release all blocked threads, then shutdown
|
|
{
|
|
std::unique_lock<std::mutex> lock(block_mutex);
|
|
release = true;
|
|
}
|
|
block_cv.notify_all();
|
|
|
|
pool.shutdown();
|
|
}
|
|
|
|
TEST(ThreadPoolTest, MaxQueuedRequests) {
|
|
// base=2, max=2 (fixed), mqr=3
|
|
ThreadPool pool(2, 2, 3);
|
|
|
|
std::mutex block_mutex;
|
|
std::condition_variable block_cv;
|
|
bool release = false;
|
|
|
|
// Block both threads
|
|
for (int i = 0; i < 2; i++) {
|
|
EXPECT_TRUE(pool.enqueue([&]() {
|
|
std::unique_lock<std::mutex> lock(block_mutex);
|
|
block_cv.wait(lock, [&] { return release; });
|
|
}));
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
// Fill the queue up to max_queued_requests
|
|
EXPECT_TRUE(pool.enqueue([]() {}));
|
|
EXPECT_TRUE(pool.enqueue([]() {}));
|
|
EXPECT_TRUE(pool.enqueue([]() {}));
|
|
|
|
// This should fail - queue is full
|
|
EXPECT_FALSE(pool.enqueue([]() {}));
|
|
|
|
// Release blocked threads
|
|
{
|
|
std::unique_lock<std::mutex> lock(block_mutex);
|
|
release = true;
|
|
}
|
|
block_cv.notify_all();
|
|
|
|
pool.shutdown();
|
|
}
|
|
|
|
#ifndef CPPHTTPLIB_NO_EXCEPTIONS
|
|
TEST(ThreadPoolTest, InvalidMaxThreadsThrows) {
|
|
// max_n < n should throw
|
|
EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument);
|
|
}
|
|
#endif
|
|
|
|
TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) {
|
|
ThreadPool pool(2);
|
|
pool.shutdown();
|
|
EXPECT_FALSE(pool.enqueue([]() {}));
|
|
}
|
|
|
|
TEST(ThreadPoolTest, ConcurrentEnqueue) {
|
|
ThreadPool pool(4, 16);
|
|
std::atomic<int> count(0);
|
|
const int num_producers = 4;
|
|
const int tasks_per_producer = 100;
|
|
|
|
std::vector<std::thread> producers;
|
|
for (int p = 0; p < num_producers; p++) {
|
|
producers.emplace_back([&]() {
|
|
for (int i = 0; i < tasks_per_producer; i++) {
|
|
pool.enqueue([&count]() { count++; });
|
|
}
|
|
});
|
|
}
|
|
|
|
for (auto &t : producers) {
|
|
t.join();
|
|
}
|
|
|
|
pool.shutdown();
|
|
EXPECT_EQ(num_producers * tasks_per_producer, count.load());
|
|
}
|