diff --git a/README-websocket.md b/README-websocket.md index 9ca5a6b..7517048 100644 --- a/README-websocket.md +++ b/README-websocket.md @@ -3,15 +3,19 @@ A simple, blocking WebSocket implementation for C++11. > [!IMPORTANT] -> This is a blocking I/O WebSocket implementation using a thread-per-connection model. If you need high-concurrency WebSocket support with non-blocking/async I/O (e.g., thousands of simultaneous connections), this is not the one that you want. +> This is a blocking I/O WebSocket implementation using a thread-per-connection model (plus one heartbeat thread per connection). It is intended for small- to mid-scale workloads; handling large numbers of simultaneous WebSocket connections is outside the design target of this library. If you need high-concurrency WebSocket support with non-blocking/async I/O (e.g., thousands of simultaneous connections), this is not the one that you want. + +> [!NOTE] +> WebSocket extensions (`permessage-deflate` and others defined by RFC 6455) are **not supported**. If a client proposes an extension via `Sec-WebSocket-Extensions`, the server silently declines it — the negotiated connection always runs without extensions. ## Features -- **RFC 6455 compliant**: Full WebSocket protocol support +- **RFC 6455 compliant**: Full WebSocket protocol support (extensions are not implemented) - **Server and Client**: Both sides included - **SSL/TLS support**: `wss://` scheme for secure connections - **Text and Binary**: Both message types supported - **Automatic heartbeat**: Periodic Ping/Pong keeps connections alive +- **Unresponsive-peer detection**: Opt-in liveness check via `set_websocket_max_missed_pongs()` - **Subprotocol negotiation**: `Sec-WebSocket-Protocol` support for GraphQL, MQTT, etc. ## Quick Start @@ -352,6 +356,7 @@ if (ws.connect()) { | `CPPHTTPLIB_WEBSOCKET_READ_TIMEOUT_SECOND` | `300` | Read timeout for WebSocket connections (seconds) | | `CPPHTTPLIB_WEBSOCKET_CLOSE_TIMEOUT_SECOND` | `5` | Timeout for waiting peer's Close response (seconds) | | `CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND` | `30` | Automatic Ping interval for heartbeat (seconds) | +| `CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS` | `0` (disabled) | Close the connection after N consecutive unacked pings | ### Runtime Ping Interval @@ -373,6 +378,20 @@ ws.set_websocket_ping_interval(10); // 10 seconds ws.set_websocket_ping_interval(0); ``` +### Unresponsive-Peer Detection (Pong Timeout) + +By default the heartbeat only sends pings — it does not enforce that pongs come back. To detect a silently dropped connection faster, enable the max-missed-pongs check. Once `max_missed_pongs` consecutive pings go unanswered, the heartbeat thread closes the connection with `CloseStatus::GoingAway` and the reason `"pong timeout"`. + +```cpp +ws.set_websocket_max_missed_pongs(2); // close after 2 consecutive unacked pings +``` + +The server side has the same `set_websocket_max_missed_pongs()`. + +With the default ping interval of 30 seconds, `max_missed_pongs = 2` detects a dead peer within ~60 seconds. The counter is reset every time a Pong frame is received, so the mechanism only works when your code is actively calling `read()` — exactly the pattern a normal WebSocket client already uses. + +**The default is `0`**, which means "never close the connection because of missing pongs." Pings are still sent on the heartbeat interval, but their responses are not checked. Even so, a dead connection does not linger forever: while your code is inside `read()`, `CPPHTTPLIB_WEBSOCKET_READ_TIMEOUT_SECOND` (default **300 seconds = 5 minutes**) acts as a backstop and `read()` fails if no frame arrives in time. `max_missed_pongs` is the knob for detecting an unresponsive peer faster than that 5-minute fallback. + ## Threading Model WebSocket connections share the same thread pool as HTTP requests. Each WebSocket connection occupies one thread for its entire lifetime. diff --git a/README.md b/README.md index 5df1ce5..c26f9cf 100644 --- a/README.md +++ b/README.md @@ -1462,7 +1462,11 @@ if (ws.connect()) { SSL is also supported via `wss://` scheme (e.g. `WebSocketClient("wss://example.com/ws")`). Subprotocol negotiation (`Sec-WebSocket-Protocol`) is supported via `SubProtocolSelector` callback. -> **Note:** WebSocket connections occupy a thread for their entire lifetime. If you plan to handle many simultaneous WebSocket connections, consider using a dynamic thread pool: `svr.new_task_queue = [] { return new ThreadPool(8, 64); };` +> **Note:** WebSocket connections occupy a thread for their entire lifetime (plus an additional thread per connection for heartbeat pings). This thread-per-connection model is intended for small- to mid-scale workloads; large numbers of simultaneous WebSocket connections are outside the design target of this library. If you expect many concurrent WebSocket clients, configure a dynamic thread pool (`svr.new_task_queue = [] { return new ThreadPool(8, 64); };`) and measure carefully. + +> **WebSocket extensions are not supported.** `permessage-deflate` and other RFC 6455 extensions are not implemented. If a client proposes them via `Sec-WebSocket-Extensions`, the server silently declines them in its handshake response. + +> **Unresponsive-peer detection.** Heartbeat pings also serve as a liveness probe when `set_websocket_max_missed_pongs(n)` is set: if the client sends `n` consecutive pings without receiving a pong, it will close the connection. Disabled by default (`0`). See [README-websocket.md](README-websocket.md) for more details. diff --git a/docs-src/pages/en/cookbook/w02-websocket-ping.md b/docs-src/pages/en/cookbook/w02-websocket-ping.md index 28054e2..9521c7f 100644 --- a/docs-src/pages/en/cookbook/w02-websocket-ping.md +++ b/docs-src/pages/en/cookbook/w02-websocket-ping.md @@ -57,4 +57,24 @@ Too short wastes bandwidth; too long and connections get dropped. As a rule of t > **Warning:** A very short ping interval spawns background work per connection and increases CPU usage. For servers with many connections, keep the interval modest. +## Detecting an unresponsive peer + +Sending pings alone doesn't tell you anything if the peer just silently dies — the TCP socket might still look open while the process on the other end is long gone. To catch that, enable the max-missed-pongs check: if N consecutive pings go unanswered, the connection is closed. + +```cpp +cli.set_websocket_max_missed_pongs(2); // close after 2 consecutive unacked pings +``` + +The server side has the same `set_websocket_max_missed_pongs()`. + +With a 30-second ping interval and `max_missed_pongs = 2`, a dead peer is detected within roughly 60 seconds and the connection is closed with `CloseStatus::GoingAway` and the reason `"pong timeout"`. + +The counter is reset whenever `read()` consumes an incoming Pong frame, so this only works if your code is actively calling `read()` in a loop — which is what a normal WebSocket client does anyway. + +### Why the default is 0 + +`max_missed_pongs` defaults to `0`, which means "never close the connection because of missing pongs." Pings are still sent on the heartbeat interval, but their responses aren't checked. If you want unresponsive-peer detection, set it explicitly to `1` or higher. + +Even with `0`, a dead connection won't linger forever: while your code is inside `read()`, `CPPHTTPLIB_WEBSOCKET_READ_TIMEOUT_SECOND` (default **300 seconds = 5 minutes**) acts as a backstop and `read()` fails if no frame arrives in time. Think of `max_missed_pongs` as the knob for detecting an unresponsive peer **faster** than that. + > For handling a closed connection, see [W03. Handle connection close](w03-websocket-close). diff --git a/docs-src/pages/ja/cookbook/w02-websocket-ping.md b/docs-src/pages/ja/cookbook/w02-websocket-ping.md index 6243d6f..d7b01ea 100644 --- a/docs-src/pages/ja/cookbook/w02-websocket-ping.md +++ b/docs-src/pages/ja/cookbook/w02-websocket-ping.md @@ -57,4 +57,24 @@ WebSocketプロトコルでは、PingフレームにはPongフレームで応答 > **Warning:** Ping間隔を極端に短くすると、WebSocket接続ごとにバックグラウンドでスレッドが走るので、CPU負荷が上がります。接続数が多いサーバーでは控えめな値に設定しましょう。 +## 無応答のピアを検出する + +Pingを送るだけでは、相手が「黙って落ちた」場合に気付けません。TCPの接続自体は生きているように見えるのに、相手のプロセスはもう応答しない、というケースです。これを検出するには、送ったPingに対してPongがN回連続で返ってこなかったら接続を切る、というオプションを有効にします。 + +```cpp +cli.set_websocket_max_missed_pongs(2); // 2回連続でPongが返ってこなければ切断 +``` + +サーバー側にも同じ`set_websocket_max_missed_pongs()`があります。 + +たとえばPing間隔が30秒で`max_missed_pongs = 2`なら、無応答のピアは約60秒で検出され、`CloseStatus::GoingAway`(理由は`"pong timeout"`)で接続が閉じられます。 + +この仕組みは`read()`を呼んでPongフレームを消費したタイミングでカウンタがリセットされます。つまり通常のWebSocketクライアントのように`read()`をループで回していれば、特に意識することなく動きます。 + +### デフォルトは無効 + +`max_missed_pongs`のデフォルトは`0`で、これは「Pongが何回返ってこなくてもこの仕組みでは切断しない」という意味です。Ping自体は送られ続けますが、応答の有無はチェックされません。無応答ピアを検出したい場合は明示的に`1`以上を設定してください。 + +ただし`0`のままでも最終的に接続が残り続けることはありません。`read()`を呼んでいる間は`CPPHTTPLIB_WEBSOCKET_READ_TIMEOUT_SECOND`(デフォルト**300秒 = 5分**)が保険として働き、フレームが一定時間来なければ`read()`が失敗します。つまり`max_missed_pongs`は「**もっと速く**無応答を検出したい」ときに使うオプションだと考えてください。 + > 接続が閉じたときの処理は[W03. 接続クローズをハンドリングする](w03-websocket-close)を参照してください。 diff --git a/httplib.h b/httplib.h index 08e6383..91439a3 100644 --- a/httplib.h +++ b/httplib.h @@ -205,6 +205,10 @@ #define CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND 30 #endif +#ifndef CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS +#define CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS 0 +#endif + /* * Headers */ @@ -1720,6 +1724,8 @@ public: Server &set_websocket_ping_interval( const std::chrono::duration &duration); + Server &set_websocket_max_missed_pongs(int count); + bool bind_to_port(const std::string &host, int port, int socket_flags = 0); int bind_to_any_port(const std::string &host, int socket_flags = 0); bool listen_after_bind(); @@ -1756,6 +1762,7 @@ protected: size_t payload_max_length_ = CPPHTTPLIB_PAYLOAD_MAX_LENGTH; time_t websocket_ping_interval_sec_ = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND; + int websocket_max_missed_pongs_ = CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS; private: using Handlers = @@ -3728,17 +3735,21 @@ private: WebSocket( Stream &strm, const Request &req, bool is_server, - time_t ping_interval_sec = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND) + time_t ping_interval_sec = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND, + int max_missed_pongs = CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS) : strm_(strm), req_(req), is_server_(is_server), - ping_interval_sec_(ping_interval_sec) { + ping_interval_sec_(ping_interval_sec), + max_missed_pongs_(max_missed_pongs) { start_heartbeat(); } WebSocket( std::unique_ptr &&owned_strm, const Request &req, bool is_server, - time_t ping_interval_sec = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND) + time_t ping_interval_sec = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND, + int max_missed_pongs = CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS) : strm_(*owned_strm), owned_strm_(std::move(owned_strm)), req_(req), - is_server_(is_server), ping_interval_sec_(ping_interval_sec) { + is_server_(is_server), ping_interval_sec_(ping_interval_sec), + max_missed_pongs_(max_missed_pongs) { start_heartbeat(); } @@ -3750,6 +3761,8 @@ private: Request req_; bool is_server_; time_t ping_interval_sec_; + int max_missed_pongs_; + int unacked_pings_ = 0; std::atomic closed_{false}; std::mutex write_mutex_; std::thread ping_thread_; @@ -3779,6 +3792,7 @@ public: void set_read_timeout(time_t sec, time_t usec = 0); void set_write_timeout(time_t sec, time_t usec = 0); void set_websocket_ping_interval(time_t sec); + void set_websocket_max_missed_pongs(int count); void set_tcp_nodelay(bool on); void set_address_family(int family); void set_ipv6_v6only(bool on); @@ -3810,6 +3824,7 @@ private: time_t write_timeout_usec_ = CPPHTTPLIB_CLIENT_WRITE_TIMEOUT_USECOND; time_t websocket_ping_interval_sec_ = CPPHTTPLIB_WEBSOCKET_PING_INTERVAL_SECOND; + int websocket_max_missed_pongs_ = CPPHTTPLIB_WEBSOCKET_MAX_MISSED_PONGS; int address_family_ = AF_UNSPEC; bool tcp_nodelay_ = CPPHTTPLIB_TCP_NODELAY; bool ipv6_v6only_ = CPPHTTPLIB_IPV6_V6ONLY; @@ -10912,6 +10927,11 @@ inline Server &Server::set_payload_max_length(size_t length) { return *this; } +inline Server &Server::set_websocket_max_missed_pongs(int count) { + websocket_max_missed_pongs_ = count; + return *this; +} + inline Server &Server::set_websocket_ping_interval(time_t sec) { websocket_ping_interval_sec_ = sec; return *this; @@ -12050,7 +12070,8 @@ Server::process_request(Stream &strm, const std::string &remote_addr, { // Use WebSocket-specific read timeout instead of HTTP timeout strm.set_read_timeout(CPPHTTPLIB_WEBSOCKET_READ_TIMEOUT_SECOND, 0); - ws::WebSocket ws(strm, req, true, websocket_ping_interval_sec_); + ws::WebSocket ws(strm, req, true, websocket_ping_interval_sec_, + websocket_max_missed_pongs_); entry.handler(req, ws); } return true; @@ -19700,7 +19721,11 @@ inline ReadResult WebSocket::read(std::string &msg) { payload.size(), true, !is_server_); continue; } - case Opcode::Pong: continue; + case Opcode::Pong: { + std::lock_guard lock(ping_mutex_); + unacked_pings_ = 0; + continue; + } case Opcode::Close: { if (!closed_.exchange(true)) { // Echo close frame back @@ -19734,7 +19759,11 @@ inline ReadResult WebSocket::read(std::string &msg) { true, !is_server_); continue; } - if (cont_opcode == Opcode::Pong) { continue; } + if (cont_opcode == Opcode::Pong) { + std::lock_guard lock(ping_mutex_); + unacked_pings_ = 0; + continue; + } if (cont_opcode == Opcode::Close) { if (!closed_.exchange(true)) { std::lock_guard lock(write_mutex_); @@ -19822,12 +19851,22 @@ inline void WebSocket::start_heartbeat() { while (!closed_) { ping_cv_.wait_for(lock, std::chrono::seconds(ping_interval_sec_)); if (closed_) { break; } + // If the peer has failed to respond to the previous pings, give up. + // RFC 6455 does not define a pong-timeout mechanism; this is an + // opt-in liveness check controlled by max_missed_pongs_. + if (max_missed_pongs_ > 0 && unacked_pings_ >= max_missed_pongs_) { + lock.unlock(); + close(CloseStatus::GoingAway, "pong timeout"); + return; + } lock.unlock(); if (!send_frame(Opcode::Ping, nullptr, 0)) { + lock.lock(); closed_ = true; break; } lock.lock(); + unacked_pings_++; } }); } @@ -19955,8 +19994,9 @@ inline bool WebSocketClient::connect() { Request req; req.method = "GET"; req.path = path_; - ws_ = std::unique_ptr( - new WebSocket(std::move(strm), req, false, websocket_ping_interval_sec_)); + ws_ = std::unique_ptr(new WebSocket(std::move(strm), req, false, + websocket_ping_interval_sec_, + websocket_max_missed_pongs_)); return true; } @@ -20000,6 +20040,10 @@ inline void WebSocketClient::set_websocket_ping_interval(time_t sec) { websocket_ping_interval_sec_ = sec; } +inline void WebSocketClient::set_websocket_max_missed_pongs(int count) { + websocket_max_missed_pongs_ = count; +} + inline void WebSocketClient::set_tcp_nodelay(bool on) { tcp_nodelay_ = on; } inline void WebSocketClient::set_address_family(int family) { diff --git a/test/test_websocket_heartbeat.cc b/test/test_websocket_heartbeat.cc index a1656e1..4aa236a 100644 --- a/test/test_websocket_heartbeat.cc +++ b/test/test_websocket_heartbeat.cc @@ -136,6 +136,86 @@ TEST_F(WebSocketServerPingIntervalTest, ServerRuntimeInterval) { client.close(); } +// Verify that the client detects a non-responsive peer via unacked-ping count. +// Setup: the server's heartbeat is disabled AND its handler never calls +// read(), so no automatic Pong reply is ever produced. The client sends +// pings but receives no pongs, and should close itself once the unacked +// ping count reaches max_missed_pongs. +class WebSocketPongTimeoutTest : public ::testing::Test { +protected: + void SetUp() override { + svr_.set_websocket_ping_interval(0); + svr_.WebSocket("/ws", [this](const Request &, ws::WebSocket &) { + std::unique_lock lock(handler_mutex_); + handler_cv_.wait(lock, [this]() { return release_; }); + }); + + port_ = svr_.bind_to_any_port("localhost"); + thread_ = std::thread([this]() { svr_.listen_after_bind(); }); + svr_.wait_until_ready(); + } + + void TearDown() override { + { + std::lock_guard lock(handler_mutex_); + release_ = true; + } + handler_cv_.notify_all(); + svr_.stop(); + thread_.join(); + } + + Server svr_; + int port_; + std::thread thread_; + std::mutex handler_mutex_; + std::condition_variable handler_cv_; + bool release_ = false; +}; + +TEST_F(WebSocketPongTimeoutTest, ClientDetectsNonResponsivePeer) { + ws::WebSocketClient client("ws://localhost:" + std::to_string(port_) + "/ws"); + client.set_websocket_max_missed_pongs(2); + ASSERT_TRUE(client.connect()); + ASSERT_TRUE(client.is_open()); + + // Client pings every 1s (compile-time default in this test file). + // With max_missed_pongs = 2, the heartbeat thread should self-close within + // roughly 3s. Poll is_open() up to 6s. + auto start = std::chrono::steady_clock::now(); + while (client.is_open() && + std::chrono::steady_clock::now() - start < std::chrono::seconds(6)) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + EXPECT_FALSE(client.is_open()); +} + +// Verify that a responsive peer does NOT trigger the pong-timeout mechanism, +// even with a small max_missed_pongs budget. This is the positive counterpart +// of ClientDetectsNonResponsivePeer: the client must actively drive read() so +// that incoming Pong frames are consumed and the unacked counter is reset. +TEST_F(WebSocketHeartbeatTest, ResponsivePeerNeverTimesOut) { + ws::WebSocketClient client("ws://localhost:" + std::to_string(port_) + "/ws"); + client.set_websocket_max_missed_pongs(2); + ASSERT_TRUE(client.connect()); + + // Interactive loop over ~6s, longer than 2 ping intervals, so the + // pong-timeout mechanism would trigger if pongs weren't being consumed. + // Each iteration's read() also drains any pending Pong frame. + for (int i = 0; i < 6; i++) { + std::string text = "keepalive" + std::to_string(i); + ASSERT_TRUE(client.send(text)); + std::string msg; + ASSERT_TRUE(client.read(msg)); + EXPECT_EQ(text, msg); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + EXPECT_TRUE(client.is_open()); + client.close(); +} + // Verify that multiple heartbeat cycles work TEST_F(WebSocketHeartbeatTest, MultipleHeartbeatCycles) { ws::WebSocketClient client("ws://localhost:" + std::to_string(port_) + "/ws");