Files
cpp-httplib/test/test_thread_pool.cc
yhirose 7d5082cc0e Make ThreadPool ctor exception-safe on partial thread creation (#2445)
* Make ThreadPool ctor exception-safe on partial thread creation

If std::thread construction throws partway through the ThreadPool
constructor (e.g., pthread_create returns EAGAIN under thread-resource
pressure), the partially-built threads_ vector would destruct joinable
std::thread objects, calling std::terminate(). Wrap the spawn loop and,
on failure, signal shutdown to the workers already created, join them,
and rethrow.

Adds a reproducer test in test_thread_pool.cc that interposes
pthread_create at link time to deterministically fail the second call,
gated to POSIX + exceptions-enabled builds.

Fix #2444

* Strip ASAN from test_thread_pool to coexist with pthread_create override

Linux libasan installs its own pthread_create interceptor; our in-binary
symbol override sits on top of it and corrupts ASAN's thread bookkeeping,
which surfaces as "Joining already joined thread" on the very first test.
Disable ASAN for this small unit-test binary -- ThreadPool memory behavior
is still exercised under ASAN by the main `test` binary.
2026-05-09 21:13:40 -04:00

286 lines
7.5 KiB
C++

// ThreadPool unit tests
// Set a short idle timeout for faster shrink tests
#define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1
#include <httplib.h>
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
using namespace httplib;
TEST(ThreadPoolTest, BasicTaskExecution) {
ThreadPool pool(4);
std::atomic<int> count(0);
for (int i = 0; i < 10; i++) {
pool.enqueue([&count]() { count++; });
}
pool.shutdown();
EXPECT_EQ(10, count.load());
}
TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) {
// max_n == 0 means max = base (fixed pool behavior)
ThreadPool pool(4);
std::atomic<int> count(0);
for (int i = 0; i < 100; i++) {
pool.enqueue([&count]() { count++; });
}
pool.shutdown();
EXPECT_EQ(100, count.load());
}
TEST(ThreadPoolTest, DynamicScaleUp) {
// base=2, max=8: block 2 base threads, then enqueue more tasks
ThreadPool pool(2, 8);
std::atomic<int> active(0);
std::atomic<int> max_active(0);
std::atomic<int> completed(0);
std::mutex barrier_mutex;
std::condition_variable barrier_cv;
bool release = false;
// Occupy all base threads with blocking tasks
for (int i = 0; i < 2; i++) {
pool.enqueue([&]() {
active++;
{
std::unique_lock<std::mutex> lock(barrier_mutex);
barrier_cv.wait(lock, [&] { return release; });
}
active--;
completed++;
});
}
// Wait for base threads to be occupied
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// These should trigger dynamic thread creation
for (int i = 0; i < 4; i++) {
pool.enqueue([&]() {
int cur = ++active;
// Track peak active count
int prev = max_active.load();
while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
active--;
completed++;
});
}
// Wait for dynamic tasks to complete
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// Release the blocking tasks
{
std::unique_lock<std::mutex> lock(barrier_mutex);
release = true;
}
barrier_cv.notify_all();
pool.shutdown();
EXPECT_EQ(6, completed.load());
// More than 2 threads were active simultaneously
EXPECT_GT(max_active.load(), 2);
}
TEST(ThreadPoolTest, DynamicShrinkAfterIdle) {
// CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second
ThreadPool pool(2, 8);
std::atomic<int> completed(0);
// Enqueue tasks that require dynamic threads
for (int i = 0; i < 8; i++) {
pool.enqueue([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
completed++;
});
}
// Wait for all tasks to complete + idle timeout + margin
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
// Now enqueue a simple task to verify the pool still works
// (base threads are still alive)
std::atomic<bool> final_task_done(false);
pool.enqueue([&]() { final_task_done = true; });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pool.shutdown();
EXPECT_EQ(8, completed.load());
EXPECT_TRUE(final_task_done.load());
}
TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) {
ThreadPool pool(2, 8);
std::atomic<int> started(0);
std::mutex block_mutex;
std::condition_variable block_cv;
bool release = false;
// Start tasks on dynamic threads that block until released
for (int i = 0; i < 6; i++) {
pool.enqueue([&]() {
started++;
std::unique_lock<std::mutex> lock(block_mutex);
block_cv.wait(lock, [&] { return release; });
});
}
// Wait for tasks to start
std::this_thread::sleep_for(std::chrono::milliseconds(200));
EXPECT_GE(started.load(), 2);
// Release all blocked threads, then shutdown
{
std::unique_lock<std::mutex> lock(block_mutex);
release = true;
}
block_cv.notify_all();
pool.shutdown();
}
TEST(ThreadPoolTest, MaxQueuedRequests) {
// base=2, max=2 (fixed), mqr=3
ThreadPool pool(2, 2, 3);
std::mutex block_mutex;
std::condition_variable block_cv;
bool release = false;
// Block both threads
for (int i = 0; i < 2; i++) {
EXPECT_TRUE(pool.enqueue([&]() {
std::unique_lock<std::mutex> lock(block_mutex);
block_cv.wait(lock, [&] { return release; });
}));
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Fill the queue up to max_queued_requests
EXPECT_TRUE(pool.enqueue([]() {}));
EXPECT_TRUE(pool.enqueue([]() {}));
EXPECT_TRUE(pool.enqueue([]() {}));
// This should fail - queue is full
EXPECT_FALSE(pool.enqueue([]() {}));
// Release blocked threads
{
std::unique_lock<std::mutex> lock(block_mutex);
release = true;
}
block_cv.notify_all();
pool.shutdown();
}
#ifndef CPPHTTPLIB_NO_EXCEPTIONS
TEST(ThreadPoolTest, InvalidMaxThreadsThrows) {
// max_n < n should throw
EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument);
}
#endif
// Issue #2444: ThreadPool constructor must be exception-safe when std::thread
// construction fails partway (e.g., pthread_create returns EAGAIN under thread
// resource pressure). Without proper handling, the partially-built threads_
// vector destroys joinable std::thread objects, calling std::terminate().
//
// We reproduce the failure portably by interposing pthread_create at link
// time: while the counter is armed, the first N calls succeed, the rest
// return EAGAIN. This is gated to POSIX + exceptions-enabled builds.
#ifndef CPPHTTPLIB_NO_EXCEPTIONS
#if defined(__unix__) || defined(__APPLE__)
#include <dlfcn.h>
#include <errno.h>
#include <pthread.h>
namespace {
// -1 = pass-through (default). >= 0 = number of remaining successful calls
// before EAGAIN is returned. Reset to -1 after each test that arms it.
std::atomic<int> g_pthread_create_remaining{-1};
} // namespace
extern "C" int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg) {
using fn_t =
int (*)(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *);
static fn_t real = reinterpret_cast<fn_t>(dlsym(RTLD_NEXT, "pthread_create"));
int n = g_pthread_create_remaining.load(std::memory_order_relaxed);
if (n == 0) { return EAGAIN; }
if (n > 0) {
g_pthread_create_remaining.fetch_sub(1, std::memory_order_relaxed);
}
return real(thread, attr, start_routine, arg);
}
TEST(ThreadPoolTest, ConstructorRecoversWhenThreadCreationFails) {
// Allow only the first thread to spawn; subsequent pthread_create calls
// return EAGAIN, causing std::thread() to throw std::system_error mid-loop.
g_pthread_create_remaining.store(1);
bool caught = false;
try {
ThreadPool pool(/*n=*/4);
(void)pool;
} catch (const std::system_error &) { caught = true; } catch (...) {
caught = true;
}
// Disarm before any further test runs.
g_pthread_create_remaining.store(-1);
EXPECT_TRUE(caught);
}
#endif // POSIX
#endif // CPPHTTPLIB_NO_EXCEPTIONS
TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) {
ThreadPool pool(2);
pool.shutdown();
EXPECT_FALSE(pool.enqueue([]() {}));
}
TEST(ThreadPoolTest, ConcurrentEnqueue) {
ThreadPool pool(4, 16);
std::atomic<int> count(0);
const int num_producers = 4;
const int tasks_per_producer = 100;
std::vector<std::thread> producers;
for (int p = 0; p < num_producers; p++) {
producers.emplace_back([&]() {
for (int i = 0; i < tasks_per_producer; i++) {
pool.enqueue([&count]() { count++; });
}
});
}
for (auto &t : producers) {
t.join();
}
pool.shutdown();
EXPECT_EQ(num_producers * tasks_per_producer, count.load());
}