// ThreadPool unit tests // Set a short idle timeout for faster shrink tests #define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1 #include #include #include #include #include #include using namespace httplib; TEST(ThreadPoolTest, BasicTaskExecution) { ThreadPool pool(4); std::atomic count(0); for (int i = 0; i < 10; i++) { pool.enqueue([&count]() { count++; }); } pool.shutdown(); EXPECT_EQ(10, count.load()); } TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) { // max_n == 0 means max = base (fixed pool behavior) ThreadPool pool(4); std::atomic count(0); for (int i = 0; i < 100; i++) { pool.enqueue([&count]() { count++; }); } pool.shutdown(); EXPECT_EQ(100, count.load()); } TEST(ThreadPoolTest, DynamicScaleUp) { // base=2, max=8: block 2 base threads, then enqueue more tasks ThreadPool pool(2, 8); std::atomic active(0); std::atomic max_active(0); std::atomic completed(0); std::mutex barrier_mutex; std::condition_variable barrier_cv; bool release = false; // Occupy all base threads with blocking tasks for (int i = 0; i < 2; i++) { pool.enqueue([&]() { active++; { std::unique_lock lock(barrier_mutex); barrier_cv.wait(lock, [&] { return release; }); } active--; completed++; }); } // Wait for base threads to be occupied std::this_thread::sleep_for(std::chrono::milliseconds(100)); // These should trigger dynamic thread creation for (int i = 0; i < 4; i++) { pool.enqueue([&]() { int cur = ++active; // Track peak active count int prev = max_active.load(); while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {} std::this_thread::sleep_for(std::chrono::milliseconds(50)); active--; completed++; }); } // Wait for dynamic tasks to complete std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Release the blocking tasks { std::unique_lock lock(barrier_mutex); release = true; } barrier_cv.notify_all(); pool.shutdown(); EXPECT_EQ(6, completed.load()); // More than 2 threads were active simultaneously EXPECT_GT(max_active.load(), 2); } TEST(ThreadPoolTest, DynamicShrinkAfterIdle) { // CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second ThreadPool pool(2, 8); std::atomic completed(0); // Enqueue tasks that require dynamic threads for (int i = 0; i < 8; i++) { pool.enqueue([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); completed++; }); } // Wait for all tasks to complete + idle timeout + margin std::this_thread::sleep_for(std::chrono::milliseconds(2500)); // Now enqueue a simple task to verify the pool still works // (base threads are still alive) std::atomic final_task_done(false); pool.enqueue([&]() { final_task_done = true; }); std::this_thread::sleep_for(std::chrono::milliseconds(100)); pool.shutdown(); EXPECT_EQ(8, completed.load()); EXPECT_TRUE(final_task_done.load()); } TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) { ThreadPool pool(2, 8); std::atomic started(0); std::mutex block_mutex; std::condition_variable block_cv; bool release = false; // Start tasks on dynamic threads that block until released for (int i = 0; i < 6; i++) { pool.enqueue([&]() { started++; std::unique_lock lock(block_mutex); block_cv.wait(lock, [&] { return release; }); }); } // Wait for tasks to start std::this_thread::sleep_for(std::chrono::milliseconds(200)); EXPECT_GE(started.load(), 2); // Release all blocked threads, then shutdown { std::unique_lock lock(block_mutex); release = true; } block_cv.notify_all(); pool.shutdown(); } TEST(ThreadPoolTest, MaxQueuedRequests) { // base=2, max=2 (fixed), mqr=3 ThreadPool pool(2, 2, 3); std::mutex block_mutex; std::condition_variable block_cv; bool release = false; // Block both threads for (int i = 0; i < 2; i++) { EXPECT_TRUE(pool.enqueue([&]() { std::unique_lock lock(block_mutex); block_cv.wait(lock, [&] { return release; }); })); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Fill the queue up to max_queued_requests EXPECT_TRUE(pool.enqueue([]() {})); EXPECT_TRUE(pool.enqueue([]() {})); EXPECT_TRUE(pool.enqueue([]() {})); // This should fail - queue is full EXPECT_FALSE(pool.enqueue([]() {})); // Release blocked threads { std::unique_lock lock(block_mutex); release = true; } block_cv.notify_all(); pool.shutdown(); } #ifndef CPPHTTPLIB_NO_EXCEPTIONS TEST(ThreadPoolTest, InvalidMaxThreadsThrows) { // max_n < n should throw EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument); } #endif // Issue #2444: ThreadPool constructor must be exception-safe when std::thread // construction fails partway (e.g., pthread_create returns EAGAIN under thread // resource pressure). Without proper handling, the partially-built threads_ // vector destroys joinable std::thread objects, calling std::terminate(). // // We reproduce the failure portably by interposing pthread_create at link // time: while the counter is armed, the first N calls succeed, the rest // return EAGAIN. This is gated to POSIX + exceptions-enabled builds. #ifndef CPPHTTPLIB_NO_EXCEPTIONS #if defined(__unix__) || defined(__APPLE__) #include #include #include namespace { // -1 = pass-through (default). >= 0 = number of remaining successful calls // before EAGAIN is returned. Reset to -1 after each test that arms it. std::atomic g_pthread_create_remaining{-1}; } // namespace extern "C" int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) { using fn_t = int (*)(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *); static fn_t real = reinterpret_cast(dlsym(RTLD_NEXT, "pthread_create")); int n = g_pthread_create_remaining.load(std::memory_order_relaxed); if (n == 0) { return EAGAIN; } if (n > 0) { g_pthread_create_remaining.fetch_sub(1, std::memory_order_relaxed); } return real(thread, attr, start_routine, arg); } TEST(ThreadPoolTest, ConstructorRecoversWhenThreadCreationFails) { // Allow only the first thread to spawn; subsequent pthread_create calls // return EAGAIN, causing std::thread() to throw std::system_error mid-loop. g_pthread_create_remaining.store(1); bool caught = false; try { ThreadPool pool(/*n=*/4); (void)pool; } catch (const std::system_error &) { caught = true; } catch (...) { caught = true; } // Disarm before any further test runs. g_pthread_create_remaining.store(-1); EXPECT_TRUE(caught); } #endif // POSIX #endif // CPPHTTPLIB_NO_EXCEPTIONS TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) { ThreadPool pool(2); pool.shutdown(); EXPECT_FALSE(pool.enqueue([]() {})); } TEST(ThreadPoolTest, ConcurrentEnqueue) { ThreadPool pool(4, 16); std::atomic count(0); const int num_producers = 4; const int tasks_per_producer = 100; std::vector producers; for (int p = 0; p < num_producers; p++) { producers.emplace_back([&]() { for (int i = 0; i < tasks_per_producer; i++) { pool.enqueue([&count]() { count++; }); } }); } for (auto &t : producers) { t.join(); } pool.shutdown(); EXPECT_EQ(num_producers * tasks_per_producer, count.load()); }