การศึกษาเชิงลึก: การสื่อสาร WebSocket และ SSE
การสื่อสารแบบเรียลไทม์เป็นรากฐานสำหรับแอปพลิเคชันสมัยใหม่ ไม่ว่าคุณกำลังสร้างแอปพลิเคชันแชท, การแจ้งเตือนแบบสด, เครื่องมือการแก้ไขร่วมกัน, หรือแดชบอร์ดการซื้อขาย, การเลือก protocol ที่เหมาะสมคือสิ่งสำคัญ คู่มือนี้สำรวจ WebSocket และ Server-Sent Events (SSE) — กลไกที่มีประสิทธิภาพสองแบบสำหรับการเชื่อมต่อถาวร, แบบสองทิศ, หรือการสื่อสารที่เริ่มจากเซิร์ฟเวอร์ — และครอบคลุมรูปแบบการใช้งานจริงสำหรับการปรับขนาดและความเชื่อถือได้
WebSocket และ SSE คืออะไร
HTTP แบบดั้งเดิมปฏิบัติตามรูปแบบ request-response: ไคลเอนต์เริ่มต้น, เซิร์ฟเวอร์ตอบสนอง, การเชื่อมต่อปิด รูปแบบนี้ไม่เหมาะสำหรับสถานการณ์เรียลไทม์ที่เซิร์ฟเวอร์ต้องการส่งอัปเดตให้กับไคลเอนต์อย่างอิสระ
WebSocket และ SSE แก้ปัญหานี้ผ่านการเชื่อมต่อถาวร, แต่ใช้วิธีการที่แตกต่างกัน:
-
WebSocket: อัปเกรด HTTP connection เป็น protocol แบบ full-duplex และ bidirectional ทั้งคู่ หลังจากสร้างแล้ว, ทั้งไคลเอนต์และเซิร์ฟเวอร์สามารถส่งข้อความได้ตลอดเวลาผ่าน TCP connection เดียวกัน สร้างบน TCP, มันจึงให้ latency ต่ำและสนับสนุน binary frames
-
Server-Sent Events (SSE): ใช้ HTTP เป็น transport และสร้าง one-way channel ที่เซิร์ฟเวอร์ส่ง text-based events ให้กับไคลเอนต์ ไคลเอนต์สามารถส่งข้อความได้ผ่าน HTTP requests แยกต่างหาก ง่ายกว่า, ใช้ HTTP infrastructure มาตรฐาน, แต่เป็น unidirectional
diagram ลำดับต่อไปนี้แสดงการไหลของ WebSocket ทั่วไป:
sequenceDiagram
participant Client
participant Server
Note over Client,Server: WebSocket Upgrade Handshake
Client->>Server: HTTP Upgrade Request (Connection, Upgrade headers)
Server->>Client: HTTP 101 Switching Protocols
Note over Client,Server: Persistent Connection Established
Server->>Client: Push Update (Frame)
Client->>Server: Client Message (Frame)
Server->>Client: Push Update (Frame)
Client->>Server: Ping
Server->>Client: Pong
Note over Client,Server: Connection Close
Client->>Server: Close Frame
Server->>Client: Close Frame (Acknowledgment)
หลักการหลัก
- Persistent Connections: ทั้ง WebSocket และ SSE รักษา TCP connections เปิด, ช่วยขจัด overhead ของ HTTP handshakes ซ้ำ ๆ
- Low Latency: ข้อความไหลทันที โดยไม่มีการหน่วงเวลาการสำรวจหรือ request round-trips
- Resource Efficiency: การเชื่อมต่อเดียวจัดการข้อความทั้งหมด bidirectional (WebSocket) หรือเริ่มจากเซิร์ฟเวอร์ (SSE)
- Scalability Challenges: Persistent connections บริโภค server resources; การปรับขนาดตามแนวนอนต้องการการประสานงานผ่าน message brokers (Redis pub/sub)
- Protocol Overhead: WebSocket frames เพิ่ม minimal overhead (2-10 bytes ต่อ frame) SSE ใช้ HTTP semantics, หนักกว่าเล็กน้อยแต่เข้ากันได้กับ proxies
- Browser Support: ทั้งสองได้รับการสนับสนุนอย่างดีในเบราว์เซอร์สมัยใหม่; WebSocket ต้องการ explicit protocol negotiation, SSE ใช้ EventSource API
WebSocket Protocol
Upgrade Handshake
WebSocket เริ่มต้นด้วย HTTP upgrade ไคลเอนต์ส่ง HTTP GET request ที่มี headers ที่เฉพาะเจาะจง:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13
เซิร์ฟเวอร์ตอบสนองด้วย HTTP 101:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Key เป็น base64-encoded random nonce; เซิร์ฟเวอร์คำนวณ Sec-WebSocket-Accept โดยต่อ key กับ magic string (258EAFA5-E910-5DAB-8B74-38787C8F4C7E), SHA-1 hashing, และ base64 encoding ซึ่งป้องกันการ cache poisoning และยืนยันความสอดคล้องกับ protocol
WebSocket Frames
หลังจากอัปเกรด, ข้อมูลไหลเป็น frames แต่ละ frame มี:
- FIN bit (1 bit): ทำเครื่องหมายส่วนสุดท้ายของข้อความ
- Opcode (4 bits): กำหนด frame type (ดูด้านล่าง)
- Mask bit (1 bit): Frames จากไคลเอนต์ไปเซิร์ฟเวอร์ถูก masked เพื่อความปลอดภัย; frames จากเซิร์ฟเวอร์ไปไคลเอนต์ไม่ได้ถูก masked
- Payload length (7, 21, หรือ 63 bits): ขนาด message
- Masking key (32 bits, ถ้า masked): XOR key สำหรับ payload encryption
- Payload data: ข้อความจริง
Opcodes
| Opcode | Name | Purpose |
|---|---|---|
| 0x0 | Continuation | Continuation ของ fragmented message |
| 0x1 | Text | UTF-8 text message |
| 0x2 | Binary | Binary data |
| 0x8 | Close | เริ่มต้นการปิด connection |
| 0x9 | Ping | ขอ pong (connection health check) |
| 0xA | Pong | ตอบสนอง ping |
การใช้งาน WebSocket ด้วย Node.js
WebSocket Server (ws library)
ws library เป็น lightweight, production-ready WebSocket server implementation
import WebSocket, { Server as WebSocketServer } from 'ws';
import http from 'http';
import { verifyToken } from './auth';
const server = http.createServer();
const wss = new WebSocketServer({ server });
interface AuthenticatedWebSocket extends WebSocket {
userId?: string;
isAlive?: boolean;
}
// Track active connections per user
const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();
// Middleware-style authentication
function verifyClient(
info: { origin: string; secure: boolean; req: http.IncomingMessage },
callback: (success: boolean, code?: number, message?: string) => void,
) {
const url = new URL(info.req.url || '', 'http://localhost');
const token = url.searchParams.get('token');
if (!token) {
callback(false, 4001, 'Token required');
return;
}
try {
const decoded = verifyToken(token);
(info.req as any).userId = decoded.sub;
callback(true);
} catch (err) {
callback(false, 4001, 'Invalid token');
}
}
const wss = new WebSocketServer({
server,
verifyClient,
clientTracking: false, // Manage connections manually for multi-server setups
});
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
ws.userId = userId;
ws.isAlive = true;
// Track connection
if (!userConnections.has(userId)) {
userConnections.set(userId, new Set());
}
userConnections.get(userId)!.add(ws);
console.log(`User ${userId} connected`);
// Heartbeat
ws.on('pong', () => {
ws.isAlive = true;
});
// Handle incoming messages
ws.on('message', async (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString());
switch (message.type) {
case 'chat':
broadcastToUser(message.recipientId, {
type: 'chat',
from: userId,
text: message.text,
timestamp: new Date(),
});
break;
case 'presence':
broadcastToUser(userId, {
type: 'user_status',
userId,
status: message.status,
});
break;
default:
ws.send(JSON.stringify({ error: 'Unknown message type' }));
}
} catch (err) {
ws.send(JSON.stringify({ error: 'Invalid message format' }));
}
});
// Handle errors
ws.on('error', (err) => {
console.error(`WebSocket error for user ${userId}:`, err);
});
// Handle close
ws.on('close', (code, reason) => {
const connections = userConnections.get(userId);
if (connections) {
connections.delete(ws);
if (connections.size === 0) {
userConnections.delete(userId);
}
}
console.log(`User ${userId} disconnected (code: ${code})`);
});
});
// Heartbeat interval (detects dead connections)
setInterval(() => {
wss.clients.forEach((ws: AuthenticatedWebSocket) => {
if (ws.isAlive === false) {
ws.terminate();
return;
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
function broadcastToUser(userId: string, message: any) {
const connections = userConnections.get(userId);
if (connections) {
connections.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
});
}
}
server.listen(3000, () => {
console.log('WebSocket server listening on port 3000');
});// Spring Boot WebSocket Server with STOMP
// pom.xml: spring-boot-starter-websocket, jjwt
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.config.annotation.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
@Controller
@EnableScheduling
public class WebSocketController {
private final SimpMessagingTemplate messagingTemplate;
private final Map<String, Set<String>> userSessions = new ConcurrentHashMap<>();
public WebSocketController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/chat")
public void handleChat(@Payload ChatMessage message, Principal principal) {
String fromUserId = principal.getName();
ChatMessage outgoing = new ChatMessage(
"chat", fromUserId, message.getText(), new Date()
);
// Route to recipient's personal queue
messagingTemplate.convertAndSendToUser(
message.getRecipientId(), "/queue/messages", outgoing
);
}
@MessageMapping("/presence")
public void handlePresence(@Payload PresenceMessage message, Principal principal) {
String userId = principal.getName();
messagingTemplate.convertAndSendToUser(
userId, "/queue/status",
new StatusMessage(userId, message.getStatus())
);
}
// Heartbeat: Spring STOMP handles ping/pong at the protocol level
@Scheduled(fixedDelay = 30000)
public void sendHeartbeat() {
messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("type", "ping"));
}
public record ChatMessage(String type, String from, String text, Date timestamp) {
public String getRecipientId() { return ""; } // set via payload
public String getText() { return text; }
public String getRecipientId(String s) { return s; }
}
public record PresenceMessage(String status) {
public String getStatus() { return status; }
}
public record StatusMessage(String userId, String status) {}
}# FastAPI WebSocket Server
# pip install fastapi uvicorn websockets python-jose
import asyncio
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, Set
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, status
from jose import jwt, JWTError
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
# Track active connections per user
user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)
def verify_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
# Authenticate before accepting
try:
payload = verify_token(token)
user_id = payload["sub"]
except JWTError:
await websocket.close(code=4001)
return
await websocket.accept()
user_connections[user_id].add(websocket)
print(f"User {user_id} connected")
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "chat":
await broadcast_to_user(
message["recipientId"],
{
"type": "chat",
"from": user_id,
"text": message["text"],
"timestamp": datetime.utcnow().isoformat(),
},
)
elif message["type"] == "presence":
await broadcast_to_user(
user_id,
{"type": "user_status", "userId": user_id, "status": message["status"]},
)
else:
await websocket.send_text(json.dumps({"error": "Unknown message type"}))
except WebSocketDisconnect:
user_connections[user_id].discard(websocket)
if not user_connections[user_id]:
del user_connections[user_id]
print(f"User {user_id} disconnected")
except Exception as e:
print(f"WebSocket error for user {user_id}: {e}")
user_connections[user_id].discard(websocket)
async def broadcast_to_user(user_id: str, message: dict):
connections = user_connections.get(user_id, set())
dead = set()
for ws in connections:
try:
await ws.send_text(json.dumps(message))
except Exception:
dead.add(ws)
connections -= dead
# Heartbeat task
async def heartbeat_task():
while True:
await asyncio.sleep(30)
for user_id, connections in list(user_connections.items()):
dead = set()
for ws in connections:
try:
await ws.send_text(json.dumps({"type": "ping"}))
except Exception:
dead.add(ws)
connections -= dead
@app.on_event("startup")
async def startup():
asyncio.create_task(heartbeat_task())// ASP.NET Core WebSocket Server
// No extra packages needed — built-in WebSocket support
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromSeconds(30) });
// Track connections per user
var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();
app.Map("/ws", async (HttpContext context) =>
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = 400;
return;
}
var token = context.Request.Query["token"].ToString();
var userId = ValidateToken(token); // implement your JWT validation
if (userId == null)
{
context.Response.StatusCode = 401;
return;
}
var ws = await context.WebSockets.AcceptWebSocketAsync();
userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);
Console.WriteLine($"User {userId} connected");
var buffer = new byte[4096];
try
{
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
var type = message.GetProperty("type").GetString();
if (type == "chat")
{
var recipientId = message.GetProperty("recipientId").GetString()!;
var outgoing = JsonSerializer.Serialize(new
{
type = "chat",
from = userId,
text = message.GetProperty("text").GetString(),
timestamp = DateTime.UtcNow,
});
await BroadcastToUser(userConnections, recipientId, outgoing);
}
}
}
finally
{
var bag = userConnections.GetValueOrDefault(userId);
// ConcurrentBag doesn't support remove; swap to a cleaner structure in production
Console.WriteLine($"User {userId} disconnected");
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Bye", CancellationToken.None);
}
});
static async Task BroadcastToUser(
ConcurrentDictionary<string, ConcurrentBag<WebSocket>> connections,
string userId, string message)
{
if (!connections.TryGetValue(userId, out var sockets)) return;
var bytes = Encoding.UTF8.GetBytes(message);
foreach (var ws in sockets)
{
if (ws.State == WebSocketState.Open)
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
static string? ValidateToken(string token)
{
// Implement JWT validation; return userId or null
return null;
}
app.Run();WebSocket Client (Browser)
class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private token: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
constructor(url: string, token: string) {
this.url = url;
this.token = token;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(
`${this.url}?token=${encodeURIComponent(this.token)}`,
);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
resolve();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (event) => {
console.error('WebSocket error:', event);
reject(new Error('Connection failed'));
};
this.ws.onclose = () => {
console.log('Disconnected');
this.attemptReconnect();
};
} catch (err) {
reject(err);
}
});
}
send(message: any) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
console.warn('WebSocket not connected');
}
}
private attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect().catch(console.error), delay);
}
}
private handleMessage(message: any) {
switch (message.type) {
case 'chat':
console.log(`Message from ${message.from}: ${message.text}`);
break;
default:
console.log('Received:', message);
}
}
disconnect() {
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// Usage
const client = new WebSocketClient(
'ws://localhost:3000/ws',
'your-jwt-token',
);
await client.connect();
client.send({ type: 'chat', recipientId: 'user123', text: 'Hello!' });// Java WebSocket Client using Jakarta WebSocket API
// Maven: jakarta.websocket-api, tyrus-standalone-client
import jakarta.websocket.*;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ClientEndpoint
public class WebSocketClient {
private Session session;
private final String url;
private final String token;
private int reconnectAttempts = 0;
private final int maxReconnectAttempts = 5;
private final long reconnectDelayMs = 1000;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public WebSocketClient(String url, String token) {
this.url = url;
this.token = token;
}
public CompletableFuture<Void> connect() {
var future = new CompletableFuture<Void>();
try {
var container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, URI.create(url + "?token=" + token));
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.reconnectAttempts = 0;
System.out.println("Connected");
}
@OnMessage
public void onMessage(String messageJson) {
// Parse and handle message
System.out.println("Received: " + messageJson);
}
@OnClose
public void onClose(Session session, CloseReason reason) {
System.out.println("Disconnected: " + reason);
attemptReconnect();
}
@OnError
public void onError(Session session, Throwable error) {
System.err.println("WebSocket error: " + error.getMessage());
}
public void send(String messageJson) {
if (session != null && session.isOpen()) {
session.getAsyncRemote().sendText(messageJson);
} else {
System.out.println("WebSocket not connected");
}
}
private void attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
long delay = reconnectDelayMs * (1L << reconnectAttempts);
reconnectAttempts++;
System.out.println("Reconnecting in " + delay + "ms...");
scheduler.schedule(() -> connect(), delay, TimeUnit.MILLISECONDS);
}
}
public void disconnect() {
if (session != null && session.isOpen()) {
try { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Normal closure")); }
catch (Exception ignored) {}
}
}
}
// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
client.connect().join();
client.send("{\"type\":\"chat\",\"recipientId\":\"user123\",\"text\":\"Hello!\"}");# Python WebSocket Client
# pip install websockets
import asyncio
import json
import math
from urllib.parse import quote
import websockets
from websockets.exceptions import ConnectionClosed
class WebSocketClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.ws = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
self.reconnect_delay = 1.0
async def connect(self):
uri = f"{self.url}?token={quote(self.token)}"
self.ws = await websockets.connect(uri)
self.reconnect_attempts = 0
print("Connected")
asyncio.create_task(self._receive_loop())
async def _receive_loop(self):
try:
async for raw in self.ws:
message = json.loads(raw)
self._handle_message(message)
except ConnectionClosed:
print("Disconnected")
await self._attempt_reconnect()
def _handle_message(self, message: dict):
if message.get("type") == "chat":
print(f"Message from {message['from']}: {message['text']}")
else:
print("Received:", message)
async def send(self, message: dict):
if self.ws and not self.ws.closed:
await self.ws.send(json.dumps(message))
else:
print("WebSocket not connected")
async def _attempt_reconnect(self):
if self.reconnect_attempts < self.max_reconnect_attempts:
delay = self.reconnect_delay * (2 ** self.reconnect_attempts)
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.0f}ms...")
await asyncio.sleep(delay / 1000)
await self.connect()
async def disconnect(self):
if self.ws:
await self.ws.close(1000, "Normal closure")
# Usage
async def main():
client = WebSocketClient("ws://localhost:3000/ws", "your-jwt-token")
await client.connect()
await client.send({"type": "chat", "recipientId": "user123", "text": "Hello!"})
await asyncio.sleep(5)
await client.disconnect()
asyncio.run(main())// C# WebSocket Client (.NET 8)
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
public class WebSocketClient
{
private ClientWebSocket? _ws;
private readonly string _url;
private readonly string _token;
private int _reconnectAttempts = 0;
private const int MaxReconnectAttempts = 5;
private const int ReconnectDelayMs = 1000;
public WebSocketClient(string url, string token)
{
_url = url;
_token = token;
}
public async Task ConnectAsync()
{
_ws = new ClientWebSocket();
await _ws.ConnectAsync(new Uri($"{_url}?token={Uri.EscapeDataString(_token)}"), CancellationToken.None);
_reconnectAttempts = 0;
Console.WriteLine("Connected");
_ = ReceiveLoopAsync();
}
private async Task ReceiveLoopAsync()
{
var buffer = new byte[4096];
try
{
while (_ws!.State == WebSocketState.Open)
{
var result = await _ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close) break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
HandleMessage(message);
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"WebSocket error: {ex.Message}");
}
Console.WriteLine("Disconnected");
await AttemptReconnectAsync();
}
private void HandleMessage(JsonElement message)
{
var type = message.GetProperty("type").GetString();
if (type == "chat")
Console.WriteLine($"Message from {message.GetProperty("from").GetString()}: {message.GetProperty("text").GetString()}");
else
Console.WriteLine($"Received: {message}");
}
public async Task SendAsync(object message)
{
if (_ws?.State == WebSocketState.Open)
{
var json = JsonSerializer.Serialize(message);
await _ws.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
Console.WriteLine("WebSocket not connected");
}
}
private async Task AttemptReconnectAsync()
{
if (_reconnectAttempts < MaxReconnectAttempts)
{
var delay = ReconnectDelayMs * (int)Math.Pow(2, _reconnectAttempts);
_reconnectAttempts++;
Console.WriteLine($"Reconnecting in {delay}ms...");
await Task.Delay(delay);
await ConnectAsync();
}
}
public async Task DisconnectAsync()
{
if (_ws != null)
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal closure", CancellationToken.None);
}
}
// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
await client.ConnectAsync();
await client.SendAsync(new { type = "chat", recipientId = "user123", text = "Hello!" });Server-Sent Events (SSE)
EventSource API
SSE ใช้ browser’s native EventSource API เซิร์ฟเวอร์ส่ง HTTP response headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
จากนั้นส่งข้อมูลเป็น newline-delimited events:
event: message
data: {"type":"chat","from":"user1","text":"Hello"}
event: notification
data: New notification arrived
: This is a comment (ignored by client)
data: {"action":"update","timestamp":"2025-04-20T10:30:00Z"}
retry: 5000
Fields:
- event: Custom event type (JavaScript
addEventListenerจะ fire event นี้) - data: Payload (สามารถเป็น JSON หรือ plain text)
- id: Last-event-id สำหรับ reconnection (เซิร์ฟเวอร์ส่งสิ่งที่ไคลเอนต์ควร retry จาก)
- retry: Milliseconds ก่อนที่ browser จะ reconnect โดยอัตโนมัติ
- Comments: Lines ที่เริ่มต้นด้วย
:จะถูกละเว้น
การใช้งาน SSE ด้วย Express
import express from 'express';
import { verifyToken } from './auth';
const app = express();
interface SSEClient {
userId: string;
res: express.Response;
lastEventId?: string;
}
const sseClients = new Map<string, Set<SSEClient>>();
// SSE endpoint
app.get('/events', (req, res) => {
const token = req.query.token as string;
if (!token) {
res.status(401).json({ error: 'Token required' });
return;
}
try {
const decoded = verifyToken(token);
const userId = decoded.sub;
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// Support reconnection with Last-Event-ID
const lastEventId = req.headers['last-event-id'] as string | undefined;
const client: SSEClient = { userId, res, lastEventId };
// Track client
if (!sseClients.has(userId)) {
sseClients.set(userId, new Set());
}
sseClients.get(userId)!.add(client);
console.log(`SSE client connected: ${userId}`);
// Send initial heartbeat
res.write(': connected\n\n');
// Handle client disconnect
res.on('close', () => {
const clients = sseClients.get(userId);
if (clients) {
clients.delete(client);
if (clients.size === 0) {
sseClients.delete(userId);
}
}
console.log(`SSE client disconnected: ${userId}`);
});
res.on('error', (err) => {
console.error(`SSE error for user ${userId}:`, err);
res.end();
});
} catch (err) {
res.status(401).json({ error: 'Invalid token' });
}
});
// Broadcast function
function broadcastEventToUser(userId: string, eventType: string, data: any, eventId?: string) {
const clients = sseClients.get(userId);
if (clients) {
clients.forEach((client) => {
if (!client.res.closed) {
client.res.write(`event: ${eventType}\n`);
if (eventId) {
client.res.write(`id: ${eventId}\n`);
}
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
}
});
}
}
// Example: Broadcast a notification
app.post('/notify', (req, res) => {
const { userId, message } = req.body;
broadcastEventToUser(userId, 'notification', message);
res.json({ sent: true });
});
// Heartbeat to prevent proxy timeout
setInterval(() => {
sseClients.forEach((clients) => {
clients.forEach((client) => {
if (!client.res.closed) {
client.res.write(': heartbeat\n\n');
}
});
});
}, 30000);
app.listen(3000, () => {
console.log('SSE server on port 3000');
});// Spring Boot SSE Server using SseEmitter
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@RequestMapping
public class SseController {
private final SseService sseService;
public SseController(SseService sseService) {
this.sseService = sseService;
}
@GetMapping("/events")
public SseEmitter subscribe(@RequestParam String token) {
String userId = validateToken(token); // implement JWT validation
if (userId == null) throw new SecurityException("Invalid token");
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
sseService.addClient(userId, emitter);
emitter.onCompletion(() -> sseService.removeClient(userId, emitter));
emitter.onTimeout(() -> sseService.removeClient(userId, emitter));
emitter.onError(e -> sseService.removeClient(userId, emitter));
try {
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
@PostMapping("/notify")
public Map<String, Boolean> notify(@RequestBody Map<String, Object> body) {
String userId = (String) body.get("userId");
Object message = body.get("message");
sseService.broadcastToUser(userId, "notification", message);
return Map.of("sent", true);
}
private String validateToken(String token) { return null; /* implement */ }
}
@Service
class SseService {
private final Map<String, Set<SseEmitter>> clients = new ConcurrentHashMap<>();
public void addClient(String userId, SseEmitter emitter) {
clients.computeIfAbsent(userId, k -> Collections.synchronizedSet(new HashSet<>())).add(emitter);
System.out.println("SSE client connected: " + userId);
}
public void removeClient(String userId, SseEmitter emitter) {
Set<SseEmitter> set = clients.get(userId);
if (set != null) {
set.remove(emitter);
if (set.isEmpty()) clients.remove(userId);
}
System.out.println("SSE client disconnected: " + userId);
}
public void broadcastToUser(String userId, String eventType, Object data) {
Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
Set<SseEmitter> dead = new HashSet<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name(eventType).data(data));
} catch (IOException e) {
dead.add(emitter);
}
}
dead.forEach(e -> removeClient(userId, e));
}
@Scheduled(fixedDelay = 30000)
public void sendHeartbeat() {
clients.forEach((userId, emitters) -> {
Set<SseEmitter> dead = new HashSet<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
dead.add(emitter);
}
}
dead.forEach(e -> removeClient(userId, e));
});
}
}# FastAPI SSE Server
# pip install fastapi uvicorn sse-starlette
import asyncio
import json
from collections import defaultdict
from typing import AsyncGenerator, Dict, Set
from fastapi import FastAPI, Query, Request
from fastapi.responses import JSONResponse
from jose import jwt, JWTError
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
sse_clients: Dict[str, Set[asyncio.Queue]] = defaultdict(set)
def verify_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
@app.get("/events")
async def sse_endpoint(request: Request, token: str = Query(...)):
try:
payload = verify_token(token)
user_id = payload["sub"]
except JWTError:
return JSONResponse({"error": "Invalid token"}, status_code=401)
queue: asyncio.Queue = asyncio.Queue()
sse_clients[user_id].add(queue)
print(f"SSE client connected: {user_id}")
async def event_generator() -> AsyncGenerator:
yield {"event": "connected", "data": ""}
try:
while True:
if await request.is_disconnected():
break
try:
event = await asyncio.wait_for(queue.get(), timeout=30)
yield event
except asyncio.TimeoutError:
# Heartbeat
yield {"comment": "heartbeat"}
finally:
sse_clients[user_id].discard(queue)
if not sse_clients[user_id]:
del sse_clients[user_id]
print(f"SSE client disconnected: {user_id}")
return EventSourceResponse(event_generator())
@app.post("/notify")
async def notify(body: dict):
user_id = body["userId"]
message = body["message"]
await broadcast_to_user(user_id, "notification", message)
return {"sent": True}
async def broadcast_to_user(user_id: str, event_type: str, data, event_id: str = None):
queues = list(sse_clients.get(user_id, []))
event = {"event": event_type, "data": json.dumps(data)}
if event_id:
event["id"] = event_id
for queue in queues:
await queue.put(event)// ASP.NET Core SSE Server (.NET 8)
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
var sseClients = new ConcurrentDictionary<string, ConcurrentBag<HttpResponse>>();
app.MapGet("/events", async (HttpContext context, string token) =>
{
var userId = ValidateToken(token);
if (userId == null) { context.Response.StatusCode = 401; return; }
context.Response.Headers["Content-Type"] = "text/event-stream";
context.Response.Headers["Cache-Control"] = "no-cache";
context.Response.Headers["Connection"] = "keep-alive";
context.Response.Headers["Access-Control-Allow-Origin"] = "*";
var res = context.Response;
sseClients.GetOrAdd(userId, _ => new ConcurrentBag<HttpResponse>()).Add(res);
Console.WriteLine($"SSE client connected: {userId}");
await res.WriteAsync(": connected\n\n");
await res.Body.FlushAsync();
try
{
// Keep connection open until client disconnects
var tcs = new TaskCompletionSource();
context.RequestAborted.Register(() => tcs.TrySetResult());
await tcs.Task;
}
finally
{
Console.WriteLine($"SSE client disconnected: {userId}");
}
});
app.MapPost("/notify", async (HttpContext context) =>
{
var body = await JsonSerializer.DeserializeAsync<JsonElement>(context.Request.Body);
var userId = body.GetProperty("userId").GetString()!;
var message = body.GetProperty("message");
await BroadcastToUser(sseClients, userId, "notification", message.ToString());
await context.Response.WriteAsJsonAsync(new { sent = true });
});
// Heartbeat background service
_ = Task.Run(async () =>
{
while (true)
{
await Task.Delay(30000);
foreach (var (userId, responses) in sseClients)
{
foreach (var res in responses)
{
try { await res.WriteAsync(": heartbeat\n\n"); await res.Body.FlushAsync(); }
catch { /* client disconnected */ }
}
}
}
});
static async Task BroadcastToUser(
ConcurrentDictionary<string, ConcurrentBag<HttpResponse>> clients,
string userId, string eventType, string data, string? eventId = null)
{
if (!clients.TryGetValue(userId, out var responses)) return;
var sb = new StringBuilder();
sb.Append($"event: {eventType}\n");
if (eventId != null) sb.Append($"id: {eventId}\n");
sb.Append($"data: {data}\n\n");
var payload = sb.ToString();
foreach (var res in responses)
{
try { await res.WriteAsync(payload); await res.Body.FlushAsync(); }
catch { /* client disconnected */ }
}
}
static string? ValidateToken(string token) => null; // implement JWT validation
app.Run();SSE Client
class SSEClient {
private eventSource: EventSource | null = null;
private url: string;
private token: string;
private reconnectAttempts = 0;
constructor(url: string, token: string) {
this.url = url;
this.token = token;
}
connect() {
const url = `${this.url}?token=${encodeURIComponent(this.token)}`;
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Notification:', data);
});
this.eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Message:', data);
});
this.eventSource.onerror = (err) => {
console.error('EventSource error:', err);
if (this.eventSource?.readyState === EventSource.CLOSED) {
this.attemptReconnect();
}
};
console.log('SSE connected');
}
private attemptReconnect() {
this.reconnectAttempts++;
const delay = 1000 * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect(), delay);
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
// Usage
const client = new SSEClient('http://localhost:3000/events', 'your-jwt-token');
client.connect();// Java SSE Client using OkHttp
// Maven: com.squareup.okhttp3:okhttp
import okhttp3.*;
import okio.BufferedSource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SseClient {
private final OkHttpClient httpClient;
private final String url;
private final String token;
private int reconnectAttempts = 0;
private final int maxReconnectAttempts = 5;
public SseClient(String url, String token) {
this.url = url;
this.token = token;
this.httpClient = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS) // No timeout for streaming
.build();
}
public void connect() {
Request request = new Request.Builder()
.url(url + "?token=" + token)
.build();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.err.println("SSE connection failed: " + e.getMessage());
attemptReconnect();
}
@Override
public void onResponse(Call call, Response response) throws IOException {
reconnectAttempts = 0;
System.out.println("SSE connected");
try (ResponseBody body = response.body()) {
if (body == null) return;
BufferedSource source = body.source();
String eventType = "";
while (!source.exhausted()) {
String line = source.readUtf8Line();
if (line == null) break;
if (line.startsWith("event:")) {
eventType = line.substring(6).trim();
} else if (line.startsWith("data:")) {
String data = line.substring(5).trim();
handleEvent(eventType, data);
eventType = "";
}
}
}
attemptReconnect();
}
});
}
private void handleEvent(String eventType, String data) {
switch (eventType) {
case "notification" -> System.out.println("Notification: " + data);
case "message" -> System.out.println("Message: " + data);
default -> System.out.println("Event [" + eventType + "]: " + data);
}
}
private void attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
long delay = 1000L * (1L << Math.min(reconnectAttempts, 5));
reconnectAttempts++;
System.out.println("Reconnecting in " + delay + "ms...");
try { Thread.sleep(delay); } catch (InterruptedException ignored) {}
connect();
}
}
public void disconnect() {
httpClient.dispatcher().cancelAll();
}
}
// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
client.connect();# Python SSE Client
# pip install httpx
import asyncio
import json
import math
import httpx
class SSEClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.reconnect_attempts = 0
async def connect(self):
stream_url = f"{self.url}?token={self.token}"
print("SSE connected")
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", stream_url) as response:
self.reconnect_attempts = 0
event_type = ""
async for line in response.aiter_lines():
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
self._handle_event(event_type, data)
event_type = ""
except Exception as e:
print(f"EventSource error: {e}")
await self._attempt_reconnect()
def _handle_event(self, event_type: str, data: str):
parsed = json.loads(data) if data.startswith("{") else data
if event_type == "notification":
print("Notification:", parsed)
elif event_type == "message":
print("Message:", parsed)
else:
print(f"Event [{event_type}]:", parsed)
async def _attempt_reconnect(self):
delay = 1000 * (2 ** min(self.reconnect_attempts, 5)) / 1000
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.0f}s...")
await asyncio.sleep(delay)
await self.connect()
# Usage
async def main():
client = SSEClient("http://localhost:3000/events", "your-jwt-token")
await client.connect()
asyncio.run(main())// C# SSE Client (.NET 8)
using System.Net.Http.Headers;
using System.Text.Json;
public class SseClient
{
private readonly HttpClient _httpClient;
private readonly string _url;
private readonly string _token;
private int _reconnectAttempts = 0;
private const int MaxReconnectAttempts = 5;
public SseClient(string url, string token)
{
_url = url;
_token = token;
_httpClient = new HttpClient { Timeout = System.Threading.Timeout.InfiniteTimeSpan };
}
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
var uri = $"{_url}?token={Uri.EscapeDataString(_token)}";
Console.WriteLine("SSE connected");
try
{
using var response = await _httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
_reconnectAttempts = 0;
using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var reader = new StreamReader(stream);
string eventType = "";
while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
var line = await reader.ReadLineAsync(cancellationToken);
if (line == null) break;
if (line.StartsWith("event:"))
eventType = line[6..].Trim();
else if (line.StartsWith("data:"))
{
var data = line[5..].Trim();
HandleEvent(eventType, data);
eventType = "";
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"EventSource error: {ex.Message}");
await AttemptReconnectAsync(cancellationToken);
}
}
private void HandleEvent(string eventType, string data)
{
switch (eventType)
{
case "notification":
Console.WriteLine($"Notification: {data}");
break;
case "message":
Console.WriteLine($"Message: {data}");
break;
default:
Console.WriteLine($"Event [{eventType}]: {data}");
break;
}
}
private async Task AttemptReconnectAsync(CancellationToken cancellationToken)
{
if (_reconnectAttempts < MaxReconnectAttempts)
{
var delay = 1000 * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5));
_reconnectAttempts++;
Console.WriteLine($"Reconnecting in {delay}ms...");
await Task.Delay(delay, cancellationToken);
await ConnectAsync(cancellationToken);
}
}
}
// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
await client.ConnectAsync();การตรวจสอบสิทธิ์สำหรับการเชื่อมต่อแบบถาวร
Token ใน Query Parameter
Pros: ทำงานกับ standard HTTP clients, ง่าย Cons: Token มองเห็นได้ใน logs, น้อยปลอดภัยบน unencrypted connections
const token = url.searchParams.get('token');
const decoded = verifyToken(token);
Token ใน Cookie
Pros: ส่งโดยอัตโนมัติกับทุก request, ปลอดภัย (HttpOnly flag) Cons: ต้องการ CORS preflight สำหรับ cross-origin
const token = req.cookies.auth_token;
const decoded = verifyToken(token);
Token ใน First Message
Pros: Clean separation, สนับสนุน bidirectional auth Cons: Window ของ unauthenticated connection
ws.on('message', (data) => {
if (!ws.authenticated) {
const { token } = JSON.parse(data);
ws.authenticated = verifyToken(token);
if (!ws.authenticated) {
ws.close(4001, 'Authentication failed');
}
return;
}
// Process authenticated message
});// Spring WebSocket TextWebSocketHandler
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
if (!Boolean.TRUE.equals(session.getAttributes().get("authenticated"))) {
JsonNode payload = objectMapper.readTree(message.getPayload());
String token = payload.get("token").asText();
boolean valid = jwtService.validateToken(token);
session.getAttributes().put("authenticated", valid);
if (!valid) {
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Authentication failed"));
}
return;
}
// Process authenticated message
}# FastAPI WebSocket: authenticate via first message
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
authenticated = False
async for data in websocket.iter_text():
if not authenticated:
payload = json.loads(data)
try:
verify_token(payload.get("token"))
authenticated = True
except Exception:
await websocket.close(code=4001, reason="Authentication failed")
return
continue
# Process authenticated message
await handle_message(websocket, data)// ASP.NET Core WebSocket: authenticate via first message
public async Task HandleAsync(WebSocket ws, CancellationToken ct)
{
bool authenticated = false;
var buffer = new byte[4096];
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, ct);
var text = Encoding.UTF8.GetString(buffer, 0, result.Count);
if (!authenticated)
{
var payload = JsonSerializer.Deserialize<AuthMessage>(text);
if (!_jwtService.ValidateToken(payload?.Token))
{
await ws.CloseAsync(
(WebSocketCloseStatus)4001, "Authentication failed", ct);
return;
}
authenticated = true;
continue;
}
// Process authenticated message
await HandleMessageAsync(ws, text, ct);
}
}Production Recommendation: ใช้ secure cookies (HttpOnly, Secure, SameSite) หรือฝัง token ใน WebSocket URL ด้วย short expiration
Heartbeat และ Ping-Pong
เหตุใด Heartbeat?
Long-lived connections อาจถูก terminate อย่างเงียบ ๆ โดย proxies, firewalls, หรือ load balancers A heartbeat ตรวจสอบ dead connections เหล่านี้ก่อนที่ไคลเอนต์/เซิร์ฟเวอร์จะพยายามส่ง
WebSocket Ping-Pong
WebSocket มี built-in PING (0x9) และ PONG (0xA) opcodes เซิร์ฟเวอร์ส่ง PING, ไคลเอนต์ตอบสนองด้วย PONG
// Server: Send ping every 30 seconds
setInterval(() => {
wss.clients.forEach((ws: AuthenticatedWebSocket) => {
if (ws.isAlive === false) {
ws.terminate();
return;
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// Client receives pong automatically; no handler needed unless custom logic// Spring WebSocket: schedule server-side ping via SimpMessagingTemplate
@Scheduled(fixedRate = 30000)
public void sendHeartbeat() {
for (WebSocketSession session : activeSessions.values()) {
if (!session.isOpen()) {
activeSessions.remove(session.getId());
continue;
}
try {
session.sendMessage(new PingMessage());
} catch (IOException e) {
log.warn("Ping failed for session {}", session.getId());
activeSessions.remove(session.getId());
}
}
}
// PongMessage is handled automatically by Spring WebSocket# FastAPI: periodic ping every 30 seconds
import asyncio
async def websocket_with_heartbeat(websocket: WebSocket):
await websocket.accept()
async def ping_task():
while True:
await asyncio.sleep(30)
try:
await websocket.send_text('{"type":"ping"}')
except Exception:
break
task = asyncio.create_task(ping_task())
try:
async for message in websocket.iter_text():
await handle_message(websocket, message)
finally:
task.cancel()// ASP.NET Core: periodic ping every 30 seconds
private async Task HeartbeatAsync(WebSocket ws, CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
while (await timer.WaitForNextTickAsync(ct) && ws.State == WebSocketState.Open)
{
var ping = Encoding.UTF8.GetBytes("{\"type\":\"ping\"}");
await ws.SendAsync(ping, WebSocketMessageType.Text, true, ct);
}
}
// Call Task.WhenAny(ReceiveLoopAsync(...), HeartbeatAsync(...)) to run bothSSE Heartbeat
SSE ใช้ comment frames หรือ custom events:
setInterval(() => {
sseClients.forEach((clients) => {
clients.forEach((client) => {
client.res.write(': heartbeat\n\n');
});
});
}, 30000);// Spring: SSE heartbeat via SseEmitter
@Scheduled(fixedRate = 30000)
public void sendSseHeartbeat() {
List<SseEmitter> dead = new ArrayList<>();
for (SseEmitter emitter : sseEmitters) {
try {
emitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
dead.add(emitter);
}
}
sseEmitters.removeAll(dead);
}# FastAPI: SSE heartbeat via comment frame
async def sse_with_heartbeat():
while True:
yield ": heartbeat\n\n"
await asyncio.sleep(30)
@app.get("/events")
async def sse_endpoint():
return StreamingResponse(sse_with_heartbeat(), media_type="text/event-stream")// ASP.NET Core: SSE heartbeat via comment frame
private async IAsyncEnumerable<string> SseWithHeartbeatAsync(
[EnumeratorCancellation] CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
yield return ": heartbeat\n\n";
await Task.Delay(TimeSpan.FromSeconds(30), ct);
}
}กลยุทธ์การเชื่อมต่อใหม่
Exponential Backoff
ลองใหม่ด้วย increasing delays เพื่อหลีกเลี่ยง thundering herd:
private attemptReconnect() {
const maxDelay = 60000; // 60 seconds max
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts),
maxDelay,
);
this.reconnectAttempts++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
}// Exponential backoff reconnect
private void attemptReconnect() {
long maxDelay = 60_000L;
long delay = Math.min(
reconnectDelay * (long) Math.pow(2, reconnectAttempts),
maxDelay
);
reconnectAttempts++;
log.info("Reconnecting in {}ms (attempt {})", delay, reconnectAttempts);
scheduler.schedule(this::connect, delay, TimeUnit.MILLISECONDS);
}# Exponential backoff reconnect
async def attempt_reconnect(self):
max_delay = 60.0 # seconds
delay = min(self.reconnect_delay * (2 ** self.reconnect_attempts), max_delay)
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.1f}s (attempt {self.reconnect_attempts})")
await asyncio.sleep(delay)
await self.connect()// Exponential backoff reconnect
private async Task AttemptReconnectAsync(CancellationToken ct)
{
const double maxDelayMs = 60_000;
double delay = Math.Min(
_reconnectDelay * Math.Pow(2, _reconnectAttempts),
maxDelayMs
);
_reconnectAttempts++;
_logger.LogInformation("Reconnecting in {Delay}ms (attempt {Attempt})", delay, _reconnectAttempts);
await Task.Delay(TimeSpan.FromMilliseconds(delay), ct);
await ConnectAsync(ct);
}Jitter
เพิ่มความสุ่มเพื่อป้องกันการ reconnect ที่ synchronized:
const jitter = Math.random() * 1000;
const delayWithJitter = delay + jitter;
setTimeout(() => this.connect(), delayWithJitter);
State Preservation
Cache messages ขณะ disconnection, replay บน reconnect:
class RobustSSEClient {
private messageBuffer: any[] = [];
private lastEventId: string | null = null;
private handleMessage(event: MessageEvent) {
this.messageBuffer.push(JSON.parse(event.data));
this.lastEventId = event.lastEventId || this.lastEventId;
// Process message...
}
private reconnect() {
const url = `${this.url}?token=${this.token}`;
if (this.lastEventId) {
// Browser EventSource sends Last-Event-ID header automatically
// Server can use it to resume from checkpoint
}
this.eventSource = new EventSource(url);
}
}// Spring WebSocket: resume from last event ID
public class RobustSseClient {
private final List<Object> messageBuffer = new ArrayList<>();
private String lastEventId = null;
public void handleMessage(String data, String eventId) {
messageBuffer.add(parseJson(data));
if (eventId != null) lastEventId = eventId;
// Process message...
}
public void reconnect(String url, String token) {
WebClient.Builder builder = WebClient.builder().baseUrl(url);
if (lastEventId != null) {
builder.defaultHeader("Last-Event-ID", lastEventId);
}
builder.build()
.get()
.uri(uriBuilder -> uriBuilder.queryParam("token", token).build())
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.subscribe(event -> handleMessage(
(String) event.data(), event.id()));
}
}# Python: SSE client with Last-Event-ID resume
import httpx
class RobustSseClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.message_buffer: list = []
self.last_event_id: str | None = None
async def connect(self):
headers = {}
if self.last_event_id:
headers["Last-Event-ID"] = self.last_event_id
async with httpx.AsyncClient() as client:
async with client.stream(
"GET", self.url,
params={"token": self.token},
headers=headers,
) as response:
async for line in response.aiter_lines():
if line.startswith("id:"):
self.last_event_id = line[3:].strip()
elif line.startswith("data:"):
data = json.loads(line[5:].strip())
self.message_buffer.append(data)
# Process message...// ASP.NET Core: SSE client with Last-Event-ID resume
public class RobustSseClient
{
private readonly HttpClient _http;
private readonly string _url;
private readonly string _token;
private readonly List<object> _messageBuffer = new();
private string? _lastEventId;
public async Task ConnectAsync(CancellationToken ct)
{
using var request = new HttpRequestMessage(HttpMethod.Get,
$"{_url}?token={_token}");
if (_lastEventId is not null)
request.Headers.TryAddWithoutValidation("Last-Event-ID", _lastEventId);
using var response = await _http.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, ct);
using var reader = new StreamReader(await response.Content.ReadAsStreamAsync(ct));
string? line;
while ((line = await reader.ReadLineAsync()) is not null)
{
if (line.StartsWith("id:")) _lastEventId = line[3..].Trim();
else if (line.StartsWith("data:"))
{
var data = JsonSerializer.Deserialize<object>(line[5..].Trim());
_messageBuffer.Add(data!);
// Process message...
}
}
}
}การปรับขนาดด้วย Redis Pub/Sub
เมื่อ deploy ข้ามเซิร์ฟเวอร์หลายตัว, WebSocket connections บน Server A ไม่สามารถส่งข้อความโดยตรงไปยัง connections บน Server B ได้ Redis pub/sub ทำหน้าที่เป็น message broker:
import redis from 'redis';
const redisClient = redis.createClient();
const redisSubscriber = redis.createClient();
await redisClient.connect();
await redisSubscriber.connect();
const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
ws.userId = userId;
if (!userConnections.has(userId)) {
userConnections.set(userId, new Set());
}
userConnections.get(userId)!.add(ws);
ws.on('message', async (data) => {
const message = JSON.parse(data.toString());
if (message.type === 'chat') {
// Publish to Redis so other servers can route to this user
await redisClient.publish(`user:${message.recipientId}:messages`, JSON.stringify({
from: userId,
text: message.text,
timestamp: new Date(),
}));
}
});
});
// Subscribe to user-specific channels
async function subscribeToUser(userId: string) {
await redisSubscriber.subscribe(`user:${userId}:messages`, (message) => {
const parsedMessage = JSON.parse(message);
const connections = userConnections.get(userId);
if (connections) {
connections.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'chat',
from: parsedMessage.from,
text: parsedMessage.text,
}));
}
});
}
});
}
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
subscribeToUser(userId);
});// Spring Boot WebSocket + Redis Pub/Sub
// pom.xml: spring-boot-starter-data-redis, spring-boot-starter-websocket
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RedisWebSocketBridge {
private final StringRedisTemplate redisTemplate;
private final SimpMessagingTemplate wsTemplate;
private final RedisMessageListenerContainer listenerContainer;
private final Map<String, MessageListener> listeners = new ConcurrentHashMap<>();
public RedisWebSocketBridge(
StringRedisTemplate redisTemplate,
SimpMessagingTemplate wsTemplate,
RedisMessageListenerContainer listenerContainer) {
this.redisTemplate = redisTemplate;
this.wsTemplate = wsTemplate;
this.listenerContainer = listenerContainer;
}
// Called when a user connects
public void subscribeToUser(String userId) {
String channel = "user:" + userId + ":messages";
MessageListener listener = (Message message, byte[] pattern) -> {
String payload = new String(message.getBody());
// Route Redis message to the user's WebSocket queue
wsTemplate.convertAndSendToUser(userId, "/queue/messages", payload);
};
listeners.put(userId, listener);
listenerContainer.addMessageListener(listener, new PatternTopic(channel));
}
// Called when a message should be sent to another user
public void publishToUser(String recipientId, String fromUserId, String text) {
String channel = "user:" + recipientId + ":messages";
String payload = String.format(
"{\"from\":\"%s\",\"text\":\"%s\",\"timestamp\":\"%s\"}",
fromUserId, text, java.time.Instant.now()
);
redisTemplate.convertAndSend(channel, payload);
}
// Called when a user disconnects
public void unsubscribeFromUser(String userId) {
MessageListener listener = listeners.remove(userId);
if (listener != null) {
listenerContainer.removeMessageListener(listener);
}
}
}# FastAPI WebSocket + Redis Pub/Sub
# pip install fastapi uvicorn redis websockets
import asyncio
import json
from collections import defaultdict
from typing import Dict, Set
import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")
redis_subscriber = aioredis.from_url("redis://localhost")
user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
user_id = validate_token(token) # implement JWT validation
if not user_id:
await websocket.close(code=4001)
return
await websocket.accept()
user_connections[user_id].add(websocket)
# Subscribe to this user's Redis channel
subscribe_task = asyncio.create_task(subscribe_to_user(user_id))
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "chat":
# Publish to Redis — any server can pick this up
await redis_client.publish(
f"user:{message['recipientId']}:messages",
json.dumps({
"from": user_id,
"text": message["text"],
"timestamp": asyncio.get_event_loop().time(),
}),
)
except WebSocketDisconnect:
user_connections[user_id].discard(websocket)
subscribe_task.cancel()
async def subscribe_to_user(user_id: str):
channel = f"user:{user_id}:messages"
async with redis_subscriber.pubsub() as pubsub:
await pubsub.subscribe(channel)
async for raw_message in pubsub.listen():
if raw_message["type"] != "message":
continue
payload = json.loads(raw_message["data"])
for ws in list(user_connections.get(user_id, [])):
try:
await ws.send_text(json.dumps({
"type": "chat",
"from": payload["from"],
"text": payload["text"],
}))
except Exception:
user_connections[user_id].discard(ws)
def validate_token(token: str):
return None # implement JWT validation// ASP.NET Core WebSocket + Redis Pub/Sub (.NET 8)
// NuGet: StackExchange.Redis, Microsoft.AspNetCore.WebSockets
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using StackExchange.Redis;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
var app = builder.Build();
app.UseWebSockets();
var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();
var redis = app.Services.GetRequiredService<IConnectionMultiplexer>();
var subscriber = redis.GetSubscriber();
app.Map("/ws", async (HttpContext context) =>
{
if (!context.WebSockets.IsWebSocketRequest) { context.Response.StatusCode = 400; return; }
var token = context.Request.Query["token"].ToString();
var userId = ValidateToken(token);
if (userId == null) { context.Response.StatusCode = 401; return; }
var ws = await context.WebSockets.AcceptWebSocketAsync();
userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);
// Subscribe to this user's Redis channel
await subscriber.SubscribeAsync($"user:{userId}:messages", async (channel, value) =>
{
var payload = JsonSerializer.Deserialize<JsonElement>(value!);
var outgoing = JsonSerializer.Serialize(new
{
type = "chat",
from = payload.GetProperty("from").GetString(),
text = payload.GetProperty("text").GetString(),
});
if (ws.State == WebSocketState.Open)
await ws.SendAsync(Encoding.UTF8.GetBytes(outgoing),
WebSocketMessageType.Text, true, CancellationToken.None);
});
var buffer = new byte[4096];
var db = redis.GetDatabase();
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close) break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
if (message.GetProperty("type").GetString() == "chat")
{
var recipientId = message.GetProperty("recipientId").GetString()!;
var pubPayload = JsonSerializer.Serialize(new
{
from = userId,
text = message.GetProperty("text").GetString(),
timestamp = DateTime.UtcNow,
});
// Publish — any server subscribed to this channel will receive it
await db.PublishAsync($"user:{recipientId}:messages", pubPayload);
}
}
await subscriber.UnsubscribeAsync($"user:{userId}:messages");
});
static string? ValidateToken(string token) => null; // implement JWT validation
app.Run();Pattern Summary:
- ไคลเอนต์ส่งข้อความไปยัง Server A
- Server A publish ไปยัง Redis channel
user:RECIPIENT:messages - Server B (ที่เชื่อมต่อผู้รับ) รับข้อความจาก Redis
- Server B route ข้อความไปยัง WebSocket connection ของผู้รับ
Backpressure และ Flow Control
ปัญหา
ถ้าไคลเอนต์ส่งข้อความเร็วกว่าเซิร์ฟเวอร์ประมวลผล, internal buffer จะเติบโต, ดูดแรม ในทำนองเดียวกัน, ถ้าไคลเอนต์ไม่สามารถรับข้อความ (slow network), send buffer ของเซิร์ฟเวอร์จะเต็ม
WebSocket Backpressure
ws.on('message', async (data) => {
const message = JSON.parse(data.toString());
// Process asynchronously; don't block the event loop
try {
await processMessage(message);
} catch (err) {
console.error('Error processing message:', err);
}
});
// Check write buffer before sending
function sendMessage(ws: WebSocket, data: any) {
const serialized = JSON.stringify(data);
if (ws.bufferedAmount > 16 * 1024) {
console.warn('Write buffer exceeded 16KB; apply backpressure');
// Option 1: Drop message (acceptable for real-time streams)
// Option 2: Queue and retry later
// Option 3: Pause reading from client
return false;
}
ws.send(serialized);
return true;
}// Spring WebSocket backpressure with bounded queue
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@Component
public class BackpressureWebSocketSender {
private static final int MAX_QUEUE_SIZE = 1000;
private static final int WARN_THRESHOLD = 800;
private final SimpMessagingTemplate messagingTemplate;
public BackpressureWebSocketSender(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// Per-user bounded queue to absorb bursts and apply backpressure
private final java.util.concurrent.ConcurrentHashMap<String, BlockingQueue<Object>>
userQueues = new java.util.concurrent.ConcurrentHashMap<>();
public boolean sendToUser(String userId, Object message) {
BlockingQueue<Object> queue = userQueues.computeIfAbsent(
userId, k -> new ArrayBlockingQueue<>(MAX_QUEUE_SIZE)
);
if (queue.size() >= WARN_THRESHOLD) {
System.out.println("Backpressure warning for user " + userId
+ ": queue=" + queue.size());
}
boolean accepted = queue.offer(message); // non-blocking; drops if full
if (!accepted) {
System.out.println("Write buffer exceeded for user " + userId + "; dropping message");
return false;
}
// Drain queue and forward to WebSocket
Object pending;
while ((pending = queue.poll()) != null) {
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", pending);
}
return true;
}
}# FastAPI WebSocket backpressure with asyncio.Queue
import asyncio
import json
from typing import Dict
MAX_QUEUE_SIZE = 1000
WARN_THRESHOLD = 800
send_queues: Dict[str, asyncio.Queue] = {}
async def send_with_backpressure(user_id: str, ws, data: dict) -> bool:
if user_id not in send_queues:
send_queues[user_id] = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
queue = send_queues[user_id]
if queue.qsize() >= WARN_THRESHOLD:
print(f"Backpressure warning for user {user_id}: queue={queue.qsize()}")
try:
queue.put_nowait(data) # Raises QueueFull if at capacity
except asyncio.QueueFull:
print(f"Write buffer exceeded 16KB equivalent for user {user_id}; dropping message")
return False
# Drain queue
while not queue.empty():
item = queue.get_nowait()
try:
await ws.send_text(json.dumps(item))
except Exception:
return False
return True
async def process_message(message: dict):
# Simulate async processing
await asyncio.sleep(0)// ASP.NET Core WebSocket backpressure with Channel<T>
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
public class BackpressureWebSocketSender
{
private const int MaxQueueSize = 1000;
private const int WarnThreshold = 800;
private readonly ConcurrentDictionary<string, Channel<object>> _userChannels = new();
public bool Enqueue(string userId, object message)
{
var channel = _userChannels.GetOrAdd(userId, _ =>
Channel.CreateBounded<object>(new BoundedChannelOptions(MaxQueueSize)
{
FullMode = BoundedChannelFullMode.DropOldest, // backpressure: drop oldest
SingleWriter = false,
SingleReader = true,
}));
if (channel.Reader.Count >= WarnThreshold)
Console.WriteLine($"Backpressure warning for user {userId}: queue={channel.Reader.Count}");
if (!channel.Writer.TryWrite(message))
{
Console.WriteLine($"Write buffer exceeded for user {userId}; dropping message");
return false;
}
return true;
}
public async Task DrainToWebSocket(string userId, WebSocket ws, CancellationToken ct)
{
if (!_userChannels.TryGetValue(userId, out var channel)) return;
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
if (ws.State != WebSocketState.Open) break;
var json = JsonSerializer.Serialize(item);
await ws.SendAsync(Encoding.UTF8.GetBytes(json),
WebSocketMessageType.Text, true, ct);
}
}
}SSE Backpressure
function broadcastToUser(userId: string, data: any): boolean {
const clients = sseClients.get(userId);
if (!clients) return true;
let allSuccess = true;
clients.forEach((client) => {
const canWrite = client.res.write(`data: ${JSON.stringify(data)}\n\n`);
if (!canWrite) {
console.warn(`Backpressure detected for user ${userId}`);
allSuccess = false;
// Apply backpressure: pause reading, implement queue, etc.
}
});
return allSuccess;
}// Java SSE backpressure — check emitter completion before sending
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Set;
public boolean broadcastToUser(String userId, String eventType, Object data,
java.util.Map<String, Set<SseEmitter>> clients) {
Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
boolean allSuccess = true;
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name(eventType).data(data));
} catch (IOException e) {
System.out.println("Backpressure detected for user " + userId + ": " + e.getMessage());
allSuccess = false;
// Remove dead emitter
emitter.completeWithError(e);
}
}
return allSuccess;
}# Python SSE backpressure — bounded asyncio.Queue per client
import asyncio
import json
from typing import Dict, Set
MAX_SSE_QUEUE = 100
# Each client gets a bounded queue; if full, backpressure is signaled
sse_queues: Dict[str, Set[asyncio.Queue]] = {}
async def broadcast_to_user(user_id: str, data: dict) -> bool:
queues = sse_queues.get(user_id, set())
all_success = True
for queue in list(queues):
try:
queue.put_nowait({"event": "message", "data": json.dumps(data)})
except asyncio.QueueFull:
print(f"Backpressure detected for user {user_id}")
all_success = False
# Drop oldest item to make room (alternative: drop newest)
queue.get_nowait()
queue.put_nowait({"event": "message", "data": json.dumps(data)})
return all_success// C# SSE backpressure — Channel<T> per response
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
public class SseBackpressureSender
{
private readonly ConcurrentDictionary<string, Channel<string>> _userChannels = new();
public bool BroadcastToUser(string userId, object data)
{
if (!_userChannels.TryGetValue(userId, out var channel)) return true;
var payload = $"data: {JsonSerializer.Serialize(data)}\n\n";
if (!channel.Writer.TryWrite(payload))
{
Console.WriteLine($"Backpressure detected for user {userId}");
// Apply backpressure: pause reading, implement queue, etc.
return false;
}
return true;
}
public Channel<string> CreateChannelForUser(string userId)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // block writer until reader drains
});
_userChannels[userId] = channel;
return channel;
}
public void RemoveUser(string userId) => _userChannels.TryRemove(userId, out _);
}การเปรียบเทียบ: WebSocket vs SSE vs Long Polling
| Feature | WebSocket | SSE | Long Polling |
|---|---|---|---|
| Bidirectional | ใช่ | ไม่มี (separate HTTP POST) | ไม่มี (separate HTTP POST) |
| Latency | ต่ำ (< 50ms) | ต่ำ (~100ms) | สูง (1-10s) |
| Connection Overhead | Minimal (2-10 bytes/frame) | Minimal (text-based) | สูง (new HTTP headers per request) |
| Server Push | Native | Native | Simulated (client polls) |
| Browser Support | Modern browsers | Modern browsers | All browsers |
| Proxy Compatibility | ดี (needs WebSocket-aware proxies) | ยอดเยี่ยม (standard HTTP) | ยอดเยี่ยม (standard HTTP) |
| Scalability | Medium (persistent connections) | Medium (persistent connections) | ง่าย (stateless) |
| Use Case | Chat, gaming, real-time dashboards | Notifications, log streaming | Legacy systems, strict proxies |
Production Checklist
Connection Management
-
Connection Limits: ตั้งค่า per-server และ per-user limits เพื่อป้องกัน resource exhaustion
const MAX_CONNECTIONS_PER_SERVER = 10000; const MAX_CONNECTIONS_PER_USER = 5; if (userConnections.get(userId)?.size >= MAX_CONNECTIONS_PER_USER) { ws.close(4029, 'Too many connections'); return; } -
Timeout Handling: ปิด idle connections
const CONNECTION_IDLE_TIMEOUT = 5 * 60 * 1000; // 5 minutes ws.on('message', () => { clearTimeout(ws.idleTimer); ws.idleTimer = setTimeout(() => ws.close(4000, 'Idle timeout'), CONNECTION_IDLE_TIMEOUT); });
Load Balancer Configuration
- Sticky Sessions: Route user’s requests ไปยังเซิร์ฟเวอร์เดียวกันเพื่อให้แน่ใจว่า WebSocket upgrade สำเร็จ
# Nginx example upstream backend { least_conn; server backend1; server backend2; server backend3; } server { location /ws { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 86400; # Long timeout for persistent connections } }
Memory Management
-
Monitor Memory Usage: ใช้
process.memoryUsage()และตั้งค่า alertssetInterval(() => { const mem = process.memoryUsage(); console.log(`Memory: ${(mem.heapUsed / 1024 / 1024).toFixed(2)}MB / ${(mem.heapTotal / 1024 / 1024).toFixed(2)}MB`); if (mem.heapUsed > 1024 * 1024 * 1024) { // 1GB console.error('Heap usage critical; closing new connections'); // Reject new connections, gracefully close old ones } }, 10000); -
Message Queue Limits: ป้องกันการ unbounded growth ของ pending messages
const MAX_MESSAGE_QUEUE = 1000; if (messageQueue.length > MAX_MESSAGE_QUEUE) { messageQueue.shift(); // Drop oldest }
Monitoring และ Logging
-
Connection Metrics:
- Active WebSocket connections
- Connections by user/server
- Message throughput (msg/sec)
- Latency percentiles (p50, p95, p99)
- Error rates และ close codes
-
Structured Logging:
logger.info('ws:connected', { userId, remoteAddress: req.socket.remoteAddress, timestamp: new Date() }); logger.error('ws:error', { userId, error: err.message, code: err.code }); logger.warn('ws:backpressure', { userId, bufferedAmount: ws.bufferedAmount });
Graceful Shutdown
function gracefulShutdown() {
console.log('Shutting down gracefully...');
// Stop accepting new connections
server.close();
// Close all WebSocket connections
wss.clients.forEach((ws) => {
ws.close(1001, 'Server shutting down');
});
// Wait for graceful closure
setTimeout(() => {
console.log('Force shutting down');
process.exit(1);
}, 30000); // 30 seconds
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);// Spring Boot graceful shutdown
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class GracefulWebSocketShutdown {
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final SimpMessagingTemplate messagingTemplate;
public GracefulWebSocketShutdown(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void registerSession(String sessionId, WebSocketSession session) {
sessions.put(sessionId, session);
}
@EventListener(ContextClosedEvent.class)
public void onShutdown() {
System.out.println("Shutting down gracefully...");
// Notify all connected clients
messagingTemplate.convertAndSend("/topic/shutdown",
java.util.Map.of("reason", "Server shutting down"));
// Close all sessions
for (WebSocketSession session : sessions.values()) {
try {
session.close(CloseStatus.SERVICE_RESTARTED);
} catch (IOException e) {
System.err.println("Error closing session: " + e.getMessage());
}
}
System.out.println("All WebSocket sessions closed");
}
}# FastAPI graceful shutdown
import asyncio
import json
import signal
from typing import Dict, Set
from fastapi import FastAPI, WebSocket
app = FastAPI()
active_websockets: Set[WebSocket] = set()
async def graceful_shutdown():
print("Shutting down gracefully...")
# Notify all clients
close_tasks = []
for ws in list(active_websockets):
try:
await ws.send_text(json.dumps({"type": "server_shutdown"}))
close_tasks.append(ws.close(1001))
except Exception:
pass
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
print("All WebSocket connections closed")
@app.on_event("shutdown")
async def shutdown_event():
await graceful_shutdown()
# uvicorn handles SIGTERM/SIGINT and calls the shutdown event// ASP.NET Core graceful shutdown (.NET 8)
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
public class GracefulShutdownService : IHostedService
{
public static readonly ConcurrentBag<WebSocket> ActiveSockets = new();
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public async Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Shutting down gracefully...");
var shutdownMsg = JsonSerializer.Serialize(new { type = "server_shutdown" });
var bytes = Encoding.UTF8.GetBytes(shutdownMsg);
var tasks = ActiveSockets
.Where(ws => ws.State == WebSocketState.Open)
.Select(async ws =>
{
try
{
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
await ws.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
"Server shutting down", CancellationToken.None);
}
catch { /* ignore — client may have already disconnected */ }
});
// Wait up to 30 seconds for graceful closure
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(30));
Console.WriteLine("All WebSocket connections closed");
}
}
// Register in Program.cs:
// builder.Services.AddHostedService<GracefulShutdownService>();Testing
- Connection Stability: ทำนาย network conditions (latency, packet loss, disconnects)
- Load Testing: ทดสอบด้วย thousands ของ concurrent connections
- Protocol Compliance: ตรวจสอบ frame structure, opcodes, และ close codes
บทสรุป
WebSocket และ SSE เป็นเครื่องมือที่มีประสิทธิภาพสำหรับการสื่อสารแบบเรียลไทม์ WebSocket เด่นในสถานการณ์ bidirectional, low-latency (chat, gaming, collaborative editing) SSE ทำงานได้ดีสำหรับ simple server-push use cases (notifications, log streaming) ที่มี proxy compatibility ดีกว่า
การปรับขนาด persistent connections ต้องการการประสานงานผ่าน message brokers, การจัดการแรมอย่างรอบคอบ, และ production-grade monitoring ด้วย proper architecture — heartbeats, reconnection strategies, load balancing, และ graceful shutdown — ทั้งสอง protocols สนับสนุน millions ของ concurrent users
เลือก WebSocket สำหรับ full bidirectionality และแอปพลิเคชันที่ latency-critical; เลือก SSE สำหรับ simplicity และ proxy compatibility และ always measure, monitor, และ test ภายใต้ realistic load
ดูเพิ่มเติมที่ การเชื่อสารระหว่างเซิร์ฟเวอร์: เทคโนโลยีอื่น ๆ