Deep Dive: WebSocket and SSE Communication
Real-time communication is foundational for modern applications. Whether you’re building chat applications, live notifications, collaborative editing tools, or trading dashboards, choosing the right protocol matters. This guide explores WebSocket and Server-Sent Events (SSE)—two powerful mechanisms for persistent, bidirectional, or server-initiated communication—and covers production patterns for scaling and reliability.
What are WebSocket and SSE?
Traditional HTTP follows a request-response model: the client initiates, the server responds, the connection closes. This pattern doesn’t fit real-time scenarios where the server needs to push updates independently to clients.
WebSocket and SSE solve this problem through persistent connections, but they take different approaches:
-
WebSocket: Upgrades an HTTP connection to a full-duplex, bidirectional protocol. Once established, either client or server can send messages at any time over the same TCP connection. Built on TCP, it provides low latency and supports binary frames.
-
Server-Sent Events (SSE): Uses HTTP as the transport and establishes a one-way channel where the server pushes text-based events to the client. The client can still send messages via separate HTTP requests. Simpler, uses standard HTTP infrastructure, but unidirectional.
The sequence diagram below illustrates a typical WebSocket flow:
sequenceDiagram
participant Client
participant Server
Note over Client,Server: WebSocket Upgrade Handshake
Client->>Server: HTTP Upgrade Request (Connection, Upgrade headers)
Server->>Client: HTTP 101 Switching Protocols
Note over Client,Server: Persistent Connection Established
Server->>Client: Push Update (Frame)
Client->>Server: Client Message (Frame)
Server->>Client: Push Update (Frame)
Client->>Server: Ping
Server->>Client: Pong
Note over Client,Server: Connection Close
Client->>Server: Close Frame
Server->>Client: Close Frame (Acknowledgment)
Core Principles
- Persistent Connections: Both WebSocket and SSE maintain open TCP connections, eliminating the overhead of repeated HTTP handshakes.
- Low Latency: Messages flow immediately without polling delays or request round-trips.
- Resource Efficiency: A single connection handles all bidirectional (WebSocket) or server-initiated (SSE) messages.
- Scalability Challenges: Persistent connections consume server resources; scaling horizontally requires coordination via message brokers (Redis pub/sub).
- Protocol Overhead: WebSocket frames add minimal overhead (2-10 bytes per frame). SSE uses HTTP semantics, slightly heavier but compatible with proxies.
- Browser Support: Both are well-supported in modern browsers; WebSocket requires explicit protocol negotiation, SSE uses EventSource API.
The WebSocket Protocol
Upgrade Handshake
WebSocket begins with an HTTP upgrade. The client sends an HTTP GET request with specific headers:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13
The server responds with HTTP 101:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
The Sec-WebSocket-Key is a base64-encoded random nonce; the server computes Sec-WebSocket-Accept by concatenating the key with a magic string (258EAFA5-E910-5DAB-8B74-38787C8F4C7E), SHA-1 hashing, and base64 encoding. This prevents cache poisoning and confirms protocol compliance.
WebSocket Frames
Once upgraded, data flows as frames. Each frame has:
- FIN bit (1 bit): Marks the last fragment of a message.
- Opcode (4 bits): Defines frame type (see below).
- Mask bit (1 bit): Client-to-server frames are masked for security; server-to-client frames are not.
- Payload length (7, 21, or 63 bits): Message size.
- Masking key (32 bits, if masked): XOR key for payload encryption.
- Payload data: The actual message.
Opcodes
| Opcode | Name | Purpose |
|---|---|---|
| 0x0 | Continuation | Continuation of a fragmented message |
| 0x1 | Text | UTF-8 text message |
| 0x2 | Binary | Binary data |
| 0x8 | Close | Initiate connection close |
| 0x9 | Ping | Request a pong (connection health check) |
| 0xA | Pong | Respond to a ping |
Implementing WebSocket with Node.js
WebSocket Server (ws library)
The ws library is a lightweight, production-ready WebSocket server implementation.
import WebSocket, { Server as WebSocketServer } from 'ws';
import http from 'http';
import { verifyToken } from './auth';
const server = http.createServer();
const wss = new WebSocketServer({ server });
interface AuthenticatedWebSocket extends WebSocket {
userId?: string;
isAlive?: boolean;
}
// Track active connections per user
const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();
// Middleware-style authentication
function verifyClient(
info: { origin: string; secure: boolean; req: http.IncomingMessage },
callback: (success: boolean, code?: number, message?: string) => void,
) {
const url = new URL(info.req.url || '', 'http://localhost');
const token = url.searchParams.get('token');
if (!token) {
callback(false, 4001, 'Token required');
return;
}
try {
const decoded = verifyToken(token);
(info.req as any).userId = decoded.sub;
callback(true);
} catch (err) {
callback(false, 4001, 'Invalid token');
}
}
const wss = new WebSocketServer({
server,
verifyClient,
clientTracking: false, // Manage connections manually for multi-server setups
});
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
ws.userId = userId;
ws.isAlive = true;
// Track connection
if (!userConnections.has(userId)) {
userConnections.set(userId, new Set());
}
userConnections.get(userId)!.add(ws);
console.log(`User ${userId} connected`);
// Heartbeat
ws.on('pong', () => {
ws.isAlive = true;
});
// Handle incoming messages
ws.on('message', async (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString());
switch (message.type) {
case 'chat':
broadcastToUser(message.recipientId, {
type: 'chat',
from: userId,
text: message.text,
timestamp: new Date(),
});
break;
case 'presence':
broadcastToUser(userId, {
type: 'user_status',
userId,
status: message.status,
});
break;
default:
ws.send(JSON.stringify({ error: 'Unknown message type' }));
}
} catch (err) {
ws.send(JSON.stringify({ error: 'Invalid message format' }));
}
});
// Handle errors
ws.on('error', (err) => {
console.error(`WebSocket error for user ${userId}:`, err);
});
// Handle close
ws.on('close', (code, reason) => {
const connections = userConnections.get(userId);
if (connections) {
connections.delete(ws);
if (connections.size === 0) {
userConnections.delete(userId);
}
}
console.log(`User ${userId} disconnected (code: ${code})`);
});
});
// Heartbeat interval (detects dead connections)
setInterval(() => {
wss.clients.forEach((ws: AuthenticatedWebSocket) => {
if (ws.isAlive === false) {
ws.terminate();
return;
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
function broadcastToUser(userId: string, message: any) {
const connections = userConnections.get(userId);
if (connections) {
connections.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
});
}
}
server.listen(3000, () => {
console.log('WebSocket server listening on port 3000');
});// Spring Boot WebSocket Server with STOMP
// pom.xml: spring-boot-starter-websocket, jjwt
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.config.annotation.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
@Controller
@EnableScheduling
public class WebSocketController {
private final SimpMessagingTemplate messagingTemplate;
private final Map<String, Set<String>> userSessions = new ConcurrentHashMap<>();
public WebSocketController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/chat")
public void handleChat(@Payload ChatMessage message, Principal principal) {
String fromUserId = principal.getName();
ChatMessage outgoing = new ChatMessage(
"chat", fromUserId, message.getText(), new Date()
);
// Route to recipient's personal queue
messagingTemplate.convertAndSendToUser(
message.getRecipientId(), "/queue/messages", outgoing
);
}
@MessageMapping("/presence")
public void handlePresence(@Payload PresenceMessage message, Principal principal) {
String userId = principal.getName();
messagingTemplate.convertAndSendToUser(
userId, "/queue/status",
new StatusMessage(userId, message.getStatus())
);
}
// Heartbeat: Spring STOMP handles ping/pong at the protocol level
@Scheduled(fixedDelay = 30000)
public void sendHeartbeat() {
messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("type", "ping"));
}
public record ChatMessage(String type, String from, String text, Date timestamp) {
public String getRecipientId() { return ""; } // set via payload
public String getText() { return text; }
public String getRecipientId(String s) { return s; }
}
public record PresenceMessage(String status) {
public String getStatus() { return status; }
}
public record StatusMessage(String userId, String status) {}
}# FastAPI WebSocket Server
# pip install fastapi uvicorn websockets python-jose
import asyncio
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, Set
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, status
from jose import jwt, JWTError
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
# Track active connections per user
user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)
def verify_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
# Authenticate before accepting
try:
payload = verify_token(token)
user_id = payload["sub"]
except JWTError:
await websocket.close(code=4001)
return
await websocket.accept()
user_connections[user_id].add(websocket)
print(f"User {user_id} connected")
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "chat":
await broadcast_to_user(
message["recipientId"],
{
"type": "chat",
"from": user_id,
"text": message["text"],
"timestamp": datetime.utcnow().isoformat(),
},
)
elif message["type"] == "presence":
await broadcast_to_user(
user_id,
{"type": "user_status", "userId": user_id, "status": message["status"]},
)
else:
await websocket.send_text(json.dumps({"error": "Unknown message type"}))
except WebSocketDisconnect:
user_connections[user_id].discard(websocket)
if not user_connections[user_id]:
del user_connections[user_id]
print(f"User {user_id} disconnected")
except Exception as e:
print(f"WebSocket error for user {user_id}: {e}")
user_connections[user_id].discard(websocket)
async def broadcast_to_user(user_id: str, message: dict):
connections = user_connections.get(user_id, set())
dead = set()
for ws in connections:
try:
await ws.send_text(json.dumps(message))
except Exception:
dead.add(ws)
connections -= dead
# Heartbeat task
async def heartbeat_task():
while True:
await asyncio.sleep(30)
for user_id, connections in list(user_connections.items()):
dead = set()
for ws in connections:
try:
await ws.send_text(json.dumps({"type": "ping"}))
except Exception:
dead.add(ws)
connections -= dead
@app.on_event("startup")
async def startup():
asyncio.create_task(heartbeat_task())// ASP.NET Core WebSocket Server
// No extra packages needed — built-in WebSocket support
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromSeconds(30) });
// Track connections per user
var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();
app.Map("/ws", async (HttpContext context) =>
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = 400;
return;
}
var token = context.Request.Query["token"].ToString();
var userId = ValidateToken(token); // implement your JWT validation
if (userId == null)
{
context.Response.StatusCode = 401;
return;
}
var ws = await context.WebSockets.AcceptWebSocketAsync();
userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);
Console.WriteLine($"User {userId} connected");
var buffer = new byte[4096];
try
{
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
var type = message.GetProperty("type").GetString();
if (type == "chat")
{
var recipientId = message.GetProperty("recipientId").GetString()!;
var outgoing = JsonSerializer.Serialize(new
{
type = "chat",
from = userId,
text = message.GetProperty("text").GetString(),
timestamp = DateTime.UtcNow,
});
await BroadcastToUser(userConnections, recipientId, outgoing);
}
}
}
finally
{
var bag = userConnections.GetValueOrDefault(userId);
// ConcurrentBag doesn't support remove; swap to a cleaner structure in production
Console.WriteLine($"User {userId} disconnected");
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Bye", CancellationToken.None);
}
});
static async Task BroadcastToUser(
ConcurrentDictionary<string, ConcurrentBag<WebSocket>> connections,
string userId, string message)
{
if (!connections.TryGetValue(userId, out var sockets)) return;
var bytes = Encoding.UTF8.GetBytes(message);
foreach (var ws in sockets)
{
if (ws.State == WebSocketState.Open)
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
static string? ValidateToken(string token)
{
// Implement JWT validation; return userId or null
return null;
}
app.Run();WebSocket Client (Browser)
class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private token: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
constructor(url: string, token: string) {
this.url = url;
this.token = token;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(
`${this.url}?token=${encodeURIComponent(this.token)}`,
);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
resolve();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (event) => {
console.error('WebSocket error:', event);
reject(new Error('Connection failed'));
};
this.ws.onclose = () => {
console.log('Disconnected');
this.attemptReconnect();
};
} catch (err) {
reject(err);
}
});
}
send(message: any) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
console.warn('WebSocket not connected');
}
}
private attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect().catch(console.error), delay);
}
}
private handleMessage(message: any) {
switch (message.type) {
case 'chat':
console.log(`Message from ${message.from}: ${message.text}`);
break;
default:
console.log('Received:', message);
}
}
disconnect() {
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// Usage
const client = new WebSocketClient(
'ws://localhost:3000/ws',
'your-jwt-token',
);
await client.connect();
client.send({ type: 'chat', recipientId: 'user123', text: 'Hello!' });// Java WebSocket Client using Jakarta WebSocket API
// Maven: jakarta.websocket-api, tyrus-standalone-client
import jakarta.websocket.*;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ClientEndpoint
public class WebSocketClient {
private Session session;
private final String url;
private final String token;
private int reconnectAttempts = 0;
private final int maxReconnectAttempts = 5;
private final long reconnectDelayMs = 1000;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public WebSocketClient(String url, String token) {
this.url = url;
this.token = token;
}
public CompletableFuture<Void> connect() {
var future = new CompletableFuture<Void>();
try {
var container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, URI.create(url + "?token=" + token));
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.reconnectAttempts = 0;
System.out.println("Connected");
}
@OnMessage
public void onMessage(String messageJson) {
// Parse and handle message
System.out.println("Received: " + messageJson);
}
@OnClose
public void onClose(Session session, CloseReason reason) {
System.out.println("Disconnected: " + reason);
attemptReconnect();
}
@OnError
public void onError(Session session, Throwable error) {
System.err.println("WebSocket error: " + error.getMessage());
}
public void send(String messageJson) {
if (session != null && session.isOpen()) {
session.getAsyncRemote().sendText(messageJson);
} else {
System.out.println("WebSocket not connected");
}
}
private void attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
long delay = reconnectDelayMs * (1L << reconnectAttempts);
reconnectAttempts++;
System.out.println("Reconnecting in " + delay + "ms...");
scheduler.schedule(() -> connect(), delay, TimeUnit.MILLISECONDS);
}
}
public void disconnect() {
if (session != null && session.isOpen()) {
try { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Normal closure")); }
catch (Exception ignored) {}
}
}
}
// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
client.connect().join();
client.send("{\"type\":\"chat\",\"recipientId\":\"user123\",\"text\":\"Hello!\"}");# Python WebSocket Client
# pip install websockets
import asyncio
import json
import math
from urllib.parse import quote
import websockets
from websockets.exceptions import ConnectionClosed
class WebSocketClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.ws = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
self.reconnect_delay = 1.0
async def connect(self):
uri = f"{self.url}?token={quote(self.token)}"
self.ws = await websockets.connect(uri)
self.reconnect_attempts = 0
print("Connected")
asyncio.create_task(self._receive_loop())
async def _receive_loop(self):
try:
async for raw in self.ws:
message = json.loads(raw)
self._handle_message(message)
except ConnectionClosed:
print("Disconnected")
await self._attempt_reconnect()
def _handle_message(self, message: dict):
if message.get("type") == "chat":
print(f"Message from {message['from']}: {message['text']}")
else:
print("Received:", message)
async def send(self, message: dict):
if self.ws and not self.ws.closed:
await self.ws.send(json.dumps(message))
else:
print("WebSocket not connected")
async def _attempt_reconnect(self):
if self.reconnect_attempts < self.max_reconnect_attempts:
delay = self.reconnect_delay * (2 ** self.reconnect_attempts)
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.0f}ms...")
await asyncio.sleep(delay / 1000)
await self.connect()
async def disconnect(self):
if self.ws:
await self.ws.close(1000, "Normal closure")
# Usage
async def main():
client = WebSocketClient("ws://localhost:3000/ws", "your-jwt-token")
await client.connect()
await client.send({"type": "chat", "recipientId": "user123", "text": "Hello!"})
await asyncio.sleep(5)
await client.disconnect()
asyncio.run(main())// C# WebSocket Client (.NET 8)
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
public class WebSocketClient
{
private ClientWebSocket? _ws;
private readonly string _url;
private readonly string _token;
private int _reconnectAttempts = 0;
private const int MaxReconnectAttempts = 5;
private const int ReconnectDelayMs = 1000;
public WebSocketClient(string url, string token)
{
_url = url;
_token = token;
}
public async Task ConnectAsync()
{
_ws = new ClientWebSocket();
await _ws.ConnectAsync(new Uri($"{_url}?token={Uri.EscapeDataString(_token)}"), CancellationToken.None);
_reconnectAttempts = 0;
Console.WriteLine("Connected");
_ = ReceiveLoopAsync();
}
private async Task ReceiveLoopAsync()
{
var buffer = new byte[4096];
try
{
while (_ws!.State == WebSocketState.Open)
{
var result = await _ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close) break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
HandleMessage(message);
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"WebSocket error: {ex.Message}");
}
Console.WriteLine("Disconnected");
await AttemptReconnectAsync();
}
private void HandleMessage(JsonElement message)
{
var type = message.GetProperty("type").GetString();
if (type == "chat")
Console.WriteLine($"Message from {message.GetProperty("from").GetString()}: {message.GetProperty("text").GetString()}");
else
Console.WriteLine($"Received: {message}");
}
public async Task SendAsync(object message)
{
if (_ws?.State == WebSocketState.Open)
{
var json = JsonSerializer.Serialize(message);
await _ws.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
Console.WriteLine("WebSocket not connected");
}
}
private async Task AttemptReconnectAsync()
{
if (_reconnectAttempts < MaxReconnectAttempts)
{
var delay = ReconnectDelayMs * (int)Math.Pow(2, _reconnectAttempts);
_reconnectAttempts++;
Console.WriteLine($"Reconnecting in {delay}ms...");
await Task.Delay(delay);
await ConnectAsync();
}
}
public async Task DisconnectAsync()
{
if (_ws != null)
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal closure", CancellationToken.None);
}
}
// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
await client.ConnectAsync();
await client.SendAsync(new { type = "chat", recipientId = "user123", text = "Hello!" });Server-Sent Events (SSE)
EventSource API
SSE uses the browser’s native EventSource API. The server sends HTTP response headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Then sends data as newline-delimited events:
event: message
data: {"type":"chat","from":"user1","text":"Hello"}
event: notification
data: New notification arrived
: This is a comment (ignored by client)
data: {"action":"update","timestamp":"2025-04-20T10:30:00Z"}
retry: 5000
Fields:
- event: Custom event type (JavaScript
addEventListenerwill fire this event). - data: Payload (can be JSON or plain text).
- id: Last-event-id for reconnection (server sends what client should retry from).
- retry: Milliseconds before browser automatically reconnects.
- Comments: Lines starting with
:are ignored.
Implementing SSE with Express
import express from 'express';
import { verifyToken } from './auth';
const app = express();
interface SSEClient {
userId: string;
res: express.Response;
lastEventId?: string;
}
const sseClients = new Map<string, Set<SSEClient>>();
// SSE endpoint
app.get('/events', (req, res) => {
const token = req.query.token as string;
if (!token) {
res.status(401).json({ error: 'Token required' });
return;
}
try {
const decoded = verifyToken(token);
const userId = decoded.sub;
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// Support reconnection with Last-Event-ID
const lastEventId = req.headers['last-event-id'] as string | undefined;
const client: SSEClient = { userId, res, lastEventId };
// Track client
if (!sseClients.has(userId)) {
sseClients.set(userId, new Set());
}
sseClients.get(userId)!.add(client);
console.log(`SSE client connected: ${userId}`);
// Send initial heartbeat
res.write(': connected\n\n');
// Handle client disconnect
res.on('close', () => {
const clients = sseClients.get(userId);
if (clients) {
clients.delete(client);
if (clients.size === 0) {
sseClients.delete(userId);
}
}
console.log(`SSE client disconnected: ${userId}`);
});
res.on('error', (err) => {
console.error(`SSE error for user ${userId}:`, err);
res.end();
});
} catch (err) {
res.status(401).json({ error: 'Invalid token' });
}
});
// Broadcast function
function broadcastEventToUser(userId: string, eventType: string, data: any, eventId?: string) {
const clients = sseClients.get(userId);
if (clients) {
clients.forEach((client) => {
if (!client.res.closed) {
client.res.write(`event: ${eventType}\n`);
if (eventId) {
client.res.write(`id: ${eventId}\n`);
}
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
}
});
}
}
// Example: Broadcast a notification
app.post('/notify', (req, res) => {
const { userId, message } = req.body;
broadcastEventToUser(userId, 'notification', message);
res.json({ sent: true });
});
// Heartbeat to prevent proxy timeout
setInterval(() => {
sseClients.forEach((clients) => {
clients.forEach((client) => {
if (!client.res.closed) {
client.res.write(': heartbeat\n\n');
}
});
});
}, 30000);
app.listen(3000, () => {
console.log('SSE server on port 3000');
});// Spring Boot SSE Server using SseEmitter
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@RequestMapping
public class SseController {
private final SseService sseService;
public SseController(SseService sseService) {
this.sseService = sseService;
}
@GetMapping("/events")
public SseEmitter subscribe(@RequestParam String token) {
String userId = validateToken(token); // implement JWT validation
if (userId == null) throw new SecurityException("Invalid token");
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
sseService.addClient(userId, emitter);
emitter.onCompletion(() -> sseService.removeClient(userId, emitter));
emitter.onTimeout(() -> sseService.removeClient(userId, emitter));
emitter.onError(e -> sseService.removeClient(userId, emitter));
try {
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
@PostMapping("/notify")
public Map<String, Boolean> notify(@RequestBody Map<String, Object> body) {
String userId = (String) body.get("userId");
Object message = body.get("message");
sseService.broadcastToUser(userId, "notification", message);
return Map.of("sent", true);
}
private String validateToken(String token) { return null; /* implement */ }
}
@Service
class SseService {
private final Map<String, Set<SseEmitter>> clients = new ConcurrentHashMap<>();
public void addClient(String userId, SseEmitter emitter) {
clients.computeIfAbsent(userId, k -> Collections.synchronizedSet(new HashSet<>())).add(emitter);
System.out.println("SSE client connected: " + userId);
}
public void removeClient(String userId, SseEmitter emitter) {
Set<SseEmitter> set = clients.get(userId);
if (set != null) {
set.remove(emitter);
if (set.isEmpty()) clients.remove(userId);
}
System.out.println("SSE client disconnected: " + userId);
}
public void broadcastToUser(String userId, String eventType, Object data) {
Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
Set<SseEmitter> dead = new HashSet<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name(eventType).data(data));
} catch (IOException e) {
dead.add(emitter);
}
}
dead.forEach(e -> removeClient(userId, e));
}
@Scheduled(fixedDelay = 30000)
public void sendHeartbeat() {
clients.forEach((userId, emitters) -> {
Set<SseEmitter> dead = new HashSet<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
dead.add(emitter);
}
}
dead.forEach(e -> removeClient(userId, e));
});
}
}# FastAPI SSE Server
# pip install fastapi uvicorn sse-starlette
import asyncio
import json
from collections import defaultdict
from typing import AsyncGenerator, Dict, Set
from fastapi import FastAPI, Query, Request
from fastapi.responses import JSONResponse
from jose import jwt, JWTError
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
sse_clients: Dict[str, Set[asyncio.Queue]] = defaultdict(set)
def verify_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
@app.get("/events")
async def sse_endpoint(request: Request, token: str = Query(...)):
try:
payload = verify_token(token)
user_id = payload["sub"]
except JWTError:
return JSONResponse({"error": "Invalid token"}, status_code=401)
queue: asyncio.Queue = asyncio.Queue()
sse_clients[user_id].add(queue)
print(f"SSE client connected: {user_id}")
async def event_generator() -> AsyncGenerator:
yield {"event": "connected", "data": ""}
try:
while True:
if await request.is_disconnected():
break
try:
event = await asyncio.wait_for(queue.get(), timeout=30)
yield event
except asyncio.TimeoutError:
# Heartbeat
yield {"comment": "heartbeat"}
finally:
sse_clients[user_id].discard(queue)
if not sse_clients[user_id]:
del sse_clients[user_id]
print(f"SSE client disconnected: {user_id}")
return EventSourceResponse(event_generator())
@app.post("/notify")
async def notify(body: dict):
user_id = body["userId"]
message = body["message"]
await broadcast_to_user(user_id, "notification", message)
return {"sent": True}
async def broadcast_to_user(user_id: str, event_type: str, data, event_id: str = None):
queues = list(sse_clients.get(user_id, []))
event = {"event": event_type, "data": json.dumps(data)}
if event_id:
event["id"] = event_id
for queue in queues:
await queue.put(event)// ASP.NET Core SSE Server (.NET 8)
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
var sseClients = new ConcurrentDictionary<string, ConcurrentBag<HttpResponse>>();
app.MapGet("/events", async (HttpContext context, string token) =>
{
var userId = ValidateToken(token);
if (userId == null) { context.Response.StatusCode = 401; return; }
context.Response.Headers["Content-Type"] = "text/event-stream";
context.Response.Headers["Cache-Control"] = "no-cache";
context.Response.Headers["Connection"] = "keep-alive";
context.Response.Headers["Access-Control-Allow-Origin"] = "*";
var res = context.Response;
sseClients.GetOrAdd(userId, _ => new ConcurrentBag<HttpResponse>()).Add(res);
Console.WriteLine($"SSE client connected: {userId}");
await res.WriteAsync(": connected\n\n");
await res.Body.FlushAsync();
try
{
// Keep connection open until client disconnects
var tcs = new TaskCompletionSource();
context.RequestAborted.Register(() => tcs.TrySetResult());
await tcs.Task;
}
finally
{
Console.WriteLine($"SSE client disconnected: {userId}");
}
});
app.MapPost("/notify", async (HttpContext context) =>
{
var body = await JsonSerializer.DeserializeAsync<JsonElement>(context.Request.Body);
var userId = body.GetProperty("userId").GetString()!;
var message = body.GetProperty("message");
await BroadcastToUser(sseClients, userId, "notification", message.ToString());
await context.Response.WriteAsJsonAsync(new { sent = true });
});
// Heartbeat background service
_ = Task.Run(async () =>
{
while (true)
{
await Task.Delay(30000);
foreach (var (userId, responses) in sseClients)
{
foreach (var res in responses)
{
try { await res.WriteAsync(": heartbeat\n\n"); await res.Body.FlushAsync(); }
catch { /* client disconnected */ }
}
}
}
});
static async Task BroadcastToUser(
ConcurrentDictionary<string, ConcurrentBag<HttpResponse>> clients,
string userId, string eventType, string data, string? eventId = null)
{
if (!clients.TryGetValue(userId, out var responses)) return;
var sb = new StringBuilder();
sb.Append($"event: {eventType}\n");
if (eventId != null) sb.Append($"id: {eventId}\n");
sb.Append($"data: {data}\n\n");
var payload = sb.ToString();
foreach (var res in responses)
{
try { await res.WriteAsync(payload); await res.Body.FlushAsync(); }
catch { /* client disconnected */ }
}
}
static string? ValidateToken(string token) => null; // implement JWT validation
app.Run();SSE Client
class SSEClient {
private eventSource: EventSource | null = null;
private url: string;
private token: string;
private reconnectAttempts = 0;
constructor(url: string, token: string) {
this.url = url;
this.token = token;
}
connect() {
const url = `${this.url}?token=${encodeURIComponent(this.token)}`;
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Notification:', data);
});
this.eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Message:', data);
});
this.eventSource.onerror = (err) => {
console.error('EventSource error:', err);
if (this.eventSource?.readyState === EventSource.CLOSED) {
this.attemptReconnect();
}
};
console.log('SSE connected');
}
private attemptReconnect() {
this.reconnectAttempts++;
const delay = 1000 * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect(), delay);
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
// Usage
const client = new SSEClient('http://localhost:3000/events', 'your-jwt-token');
client.connect();// Java SSE Client using OkHttp
// Maven: com.squareup.okhttp3:okhttp
import okhttp3.*;
import okio.BufferedSource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SseClient {
private final OkHttpClient httpClient;
private final String url;
private final String token;
private int reconnectAttempts = 0;
private final int maxReconnectAttempts = 5;
public SseClient(String url, String token) {
this.url = url;
this.token = token;
this.httpClient = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS) // No timeout for streaming
.build();
}
public void connect() {
Request request = new Request.Builder()
.url(url + "?token=" + token)
.build();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.err.println("SSE connection failed: " + e.getMessage());
attemptReconnect();
}
@Override
public void onResponse(Call call, Response response) throws IOException {
reconnectAttempts = 0;
System.out.println("SSE connected");
try (ResponseBody body = response.body()) {
if (body == null) return;
BufferedSource source = body.source();
String eventType = "";
while (!source.exhausted()) {
String line = source.readUtf8Line();
if (line == null) break;
if (line.startsWith("event:")) {
eventType = line.substring(6).trim();
} else if (line.startsWith("data:")) {
String data = line.substring(5).trim();
handleEvent(eventType, data);
eventType = "";
}
}
}
attemptReconnect();
}
});
}
private void handleEvent(String eventType, String data) {
switch (eventType) {
case "notification" -> System.out.println("Notification: " + data);
case "message" -> System.out.println("Message: " + data);
default -> System.out.println("Event [" + eventType + "]: " + data);
}
}
private void attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
long delay = 1000L * (1L << Math.min(reconnectAttempts, 5));
reconnectAttempts++;
System.out.println("Reconnecting in " + delay + "ms...");
try { Thread.sleep(delay); } catch (InterruptedException ignored) {}
connect();
}
}
public void disconnect() {
httpClient.dispatcher().cancelAll();
}
}
// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
client.connect();# Python SSE Client
# pip install httpx
import asyncio
import json
import math
import httpx
class SSEClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.reconnect_attempts = 0
async def connect(self):
stream_url = f"{self.url}?token={self.token}"
print("SSE connected")
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", stream_url) as response:
self.reconnect_attempts = 0
event_type = ""
async for line in response.aiter_lines():
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
self._handle_event(event_type, data)
event_type = ""
except Exception as e:
print(f"EventSource error: {e}")
await self._attempt_reconnect()
def _handle_event(self, event_type: str, data: str):
parsed = json.loads(data) if data.startswith("{") else data
if event_type == "notification":
print("Notification:", parsed)
elif event_type == "message":
print("Message:", parsed)
else:
print(f"Event [{event_type}]:", parsed)
async def _attempt_reconnect(self):
delay = 1000 * (2 ** min(self.reconnect_attempts, 5)) / 1000
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.0f}s...")
await asyncio.sleep(delay)
await self.connect()
# Usage
async def main():
client = SSEClient("http://localhost:3000/events", "your-jwt-token")
await client.connect()
asyncio.run(main())// C# SSE Client (.NET 8)
using System.Net.Http.Headers;
using System.Text.Json;
public class SseClient
{
private readonly HttpClient _httpClient;
private readonly string _url;
private readonly string _token;
private int _reconnectAttempts = 0;
private const int MaxReconnectAttempts = 5;
public SseClient(string url, string token)
{
_url = url;
_token = token;
_httpClient = new HttpClient { Timeout = System.Threading.Timeout.InfiniteTimeSpan };
}
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
var uri = $"{_url}?token={Uri.EscapeDataString(_token)}";
Console.WriteLine("SSE connected");
try
{
using var response = await _httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
_reconnectAttempts = 0;
using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var reader = new StreamReader(stream);
string eventType = "";
while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
var line = await reader.ReadLineAsync(cancellationToken);
if (line == null) break;
if (line.StartsWith("event:"))
eventType = line[6..].Trim();
else if (line.StartsWith("data:"))
{
var data = line[5..].Trim();
HandleEvent(eventType, data);
eventType = "";
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"EventSource error: {ex.Message}");
await AttemptReconnectAsync(cancellationToken);
}
}
private void HandleEvent(string eventType, string data)
{
switch (eventType)
{
case "notification":
Console.WriteLine($"Notification: {data}");
break;
case "message":
Console.WriteLine($"Message: {data}");
break;
default:
Console.WriteLine($"Event [{eventType}]: {data}");
break;
}
}
private async Task AttemptReconnectAsync(CancellationToken cancellationToken)
{
if (_reconnectAttempts < MaxReconnectAttempts)
{
var delay = 1000 * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5));
_reconnectAttempts++;
Console.WriteLine($"Reconnecting in {delay}ms...");
await Task.Delay(delay, cancellationToken);
await ConnectAsync(cancellationToken);
}
}
}
// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
await client.ConnectAsync();Authentication for Persistent Connections
Token in Query Parameter
Pros: Works with standard HTTP clients, simple. Cons: Token visible in logs, less secure over unencrypted connections.
const token = url.searchParams.get('token');
const decoded = verifyToken(token);
Token in Cookie
Pros: Automatic with every request, secure (HttpOnly flag). Cons: Requires CORS preflight for cross-origin.
const token = req.cookies.auth_token;
const decoded = verifyToken(token);
Token in First Message
Pros: Clean separation, supports bidirectional auth. Cons: Window of unauthenticated connection.
ws.on('message', (data) => {
if (!ws.authenticated) {
const { token } = JSON.parse(data);
ws.authenticated = verifyToken(token);
if (!ws.authenticated) {
ws.close(4001, 'Authentication failed');
}
return;
}
// Process authenticated message
});// Spring WebSocket TextWebSocketHandler
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
if (!Boolean.TRUE.equals(session.getAttributes().get("authenticated"))) {
JsonNode payload = objectMapper.readTree(message.getPayload());
String token = payload.get("token").asText();
boolean valid = jwtService.validateToken(token);
session.getAttributes().put("authenticated", valid);
if (!valid) {
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Authentication failed"));
}
return;
}
// Process authenticated message
}# FastAPI WebSocket: authenticate via first message
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
authenticated = False
async for data in websocket.iter_text():
if not authenticated:
payload = json.loads(data)
try:
verify_token(payload.get("token"))
authenticated = True
except Exception:
await websocket.close(code=4001, reason="Authentication failed")
return
continue
# Process authenticated message
await handle_message(websocket, data)// ASP.NET Core WebSocket: authenticate via first message
public async Task HandleAsync(WebSocket ws, CancellationToken ct)
{
bool authenticated = false;
var buffer = new byte[4096];
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, ct);
var text = Encoding.UTF8.GetString(buffer, 0, result.Count);
if (!authenticated)
{
var payload = JsonSerializer.Deserialize<AuthMessage>(text);
if (!_jwtService.ValidateToken(payload?.Token))
{
await ws.CloseAsync(
(WebSocketCloseStatus)4001, "Authentication failed", ct);
return;
}
authenticated = true;
continue;
}
// Process authenticated message
await HandleMessageAsync(ws, text, ct);
}
}Production Recommendation: Use secure cookies (HttpOnly, Secure, SameSite) or embed the token in WebSocket URL with short expiration.
Heartbeat and Ping-Pong
Why Heartbeat?
Long-lived connections may be terminated silently by proxies, firewalls, or load balancers. A heartbeat detects these dead connections before the client/server tries to send.
WebSocket Ping-Pong
WebSocket has built-in PING (0x9) and PONG (0xA) opcodes. The server sends PING, the client responds with PONG.
// Server: Send ping every 30 seconds
setInterval(() => {
wss.clients.forEach((ws: AuthenticatedWebSocket) => {
if (ws.isAlive === false) {
ws.terminate();
return;
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// Client receives pong automatically; no handler needed unless custom logic// Spring WebSocket: schedule server-side ping via SimpMessagingTemplate
@Scheduled(fixedRate = 30000)
public void sendHeartbeat() {
for (WebSocketSession session : activeSessions.values()) {
if (!session.isOpen()) {
activeSessions.remove(session.getId());
continue;
}
try {
session.sendMessage(new PingMessage());
} catch (IOException e) {
log.warn("Ping failed for session {}", session.getId());
activeSessions.remove(session.getId());
}
}
}
// PongMessage is handled automatically by Spring WebSocket# FastAPI: periodic ping every 30 seconds
import asyncio
async def websocket_with_heartbeat(websocket: WebSocket):
await websocket.accept()
async def ping_task():
while True:
await asyncio.sleep(30)
try:
await websocket.send_text('{"type":"ping"}')
except Exception:
break
task = asyncio.create_task(ping_task())
try:
async for message in websocket.iter_text():
await handle_message(websocket, message)
finally:
task.cancel()// ASP.NET Core: periodic ping every 30 seconds
private async Task HeartbeatAsync(WebSocket ws, CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
while (await timer.WaitForNextTickAsync(ct) && ws.State == WebSocketState.Open)
{
var ping = Encoding.UTF8.GetBytes("{\"type\":\"ping\"}");
await ws.SendAsync(ping, WebSocketMessageType.Text, true, ct);
}
}
// Call Task.WhenAny(ReceiveLoopAsync(...), HeartbeatAsync(...)) to run bothSSE Heartbeat
SSE uses comment frames or custom events:
setInterval(() => {
sseClients.forEach((clients) => {
clients.forEach((client) => {
client.res.write(': heartbeat\n\n');
});
});
}, 30000);// Spring: SSE heartbeat via SseEmitter
@Scheduled(fixedRate = 30000)
public void sendSseHeartbeat() {
List<SseEmitter> dead = new ArrayList<>();
for (SseEmitter emitter : sseEmitters) {
try {
emitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
dead.add(emitter);
}
}
sseEmitters.removeAll(dead);
}# FastAPI: SSE heartbeat via comment frame
async def sse_with_heartbeat():
while True:
yield ": heartbeat\n\n"
await asyncio.sleep(30)
@app.get("/events")
async def sse_endpoint():
return StreamingResponse(sse_with_heartbeat(), media_type="text/event-stream")// ASP.NET Core: SSE heartbeat via comment frame
private async IAsyncEnumerable<string> SseWithHeartbeatAsync(
[EnumeratorCancellation] CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
yield return ": heartbeat\n\n";
await Task.Delay(TimeSpan.FromSeconds(30), ct);
}
}Reconnection Strategies
Exponential Backoff
Retry with increasing delays to avoid thundering herd:
private attemptReconnect() {
const maxDelay = 60000; // 60 seconds max
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts),
maxDelay,
);
this.reconnectAttempts++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
}// Exponential backoff reconnect
private void attemptReconnect() {
long maxDelay = 60_000L;
long delay = Math.min(
reconnectDelay * (long) Math.pow(2, reconnectAttempts),
maxDelay
);
reconnectAttempts++;
log.info("Reconnecting in {}ms (attempt {})", delay, reconnectAttempts);
scheduler.schedule(this::connect, delay, TimeUnit.MILLISECONDS);
}# Exponential backoff reconnect
async def attempt_reconnect(self):
max_delay = 60.0 # seconds
delay = min(self.reconnect_delay * (2 ** self.reconnect_attempts), max_delay)
self.reconnect_attempts += 1
print(f"Reconnecting in {delay:.1f}s (attempt {self.reconnect_attempts})")
await asyncio.sleep(delay)
await self.connect()// Exponential backoff reconnect
private async Task AttemptReconnectAsync(CancellationToken ct)
{
const double maxDelayMs = 60_000;
double delay = Math.Min(
_reconnectDelay * Math.Pow(2, _reconnectAttempts),
maxDelayMs
);
_reconnectAttempts++;
_logger.LogInformation("Reconnecting in {Delay}ms (attempt {Attempt})", delay, _reconnectAttempts);
await Task.Delay(TimeSpan.FromMilliseconds(delay), ct);
await ConnectAsync(ct);
}Jitter
Add randomness to prevent synchronized reconnections:
const jitter = Math.random() * 1000;
const delayWithJitter = delay + jitter;
setTimeout(() => this.connect(), delayWithJitter);
State Preservation
Cache messages during disconnection, replay on reconnect:
class RobustSSEClient {
private messageBuffer: any[] = [];
private lastEventId: string | null = null;
private handleMessage(event: MessageEvent) {
this.messageBuffer.push(JSON.parse(event.data));
this.lastEventId = event.lastEventId || this.lastEventId;
// Process message...
}
private reconnect() {
const url = `${this.url}?token=${this.token}`;
if (this.lastEventId) {
// Browser EventSource sends Last-Event-ID header automatically
// Server can use it to resume from checkpoint
}
this.eventSource = new EventSource(url);
}
}// Spring WebSocket: resume from last event ID
public class RobustSseClient {
private final List<Object> messageBuffer = new ArrayList<>();
private String lastEventId = null;
public void handleMessage(String data, String eventId) {
messageBuffer.add(parseJson(data));
if (eventId != null) lastEventId = eventId;
// Process message...
}
public void reconnect(String url, String token) {
WebClient.Builder builder = WebClient.builder().baseUrl(url);
if (lastEventId != null) {
builder.defaultHeader("Last-Event-ID", lastEventId);
}
builder.build()
.get()
.uri(uriBuilder -> uriBuilder.queryParam("token", token).build())
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.subscribe(event -> handleMessage(
(String) event.data(), event.id()));
}
}# Python: SSE client with Last-Event-ID resume
import httpx
class RobustSseClient:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.message_buffer: list = []
self.last_event_id: str | None = None
async def connect(self):
headers = {}
if self.last_event_id:
headers["Last-Event-ID"] = self.last_event_id
async with httpx.AsyncClient() as client:
async with client.stream(
"GET", self.url,
params={"token": self.token},
headers=headers,
) as response:
async for line in response.aiter_lines():
if line.startswith("id:"):
self.last_event_id = line[3:].strip()
elif line.startswith("data:"):
data = json.loads(line[5:].strip())
self.message_buffer.append(data)
# Process message...// ASP.NET Core: SSE client with Last-Event-ID resume
public class RobustSseClient
{
private readonly HttpClient _http;
private readonly string _url;
private readonly string _token;
private readonly List<object> _messageBuffer = new();
private string? _lastEventId;
public async Task ConnectAsync(CancellationToken ct)
{
using var request = new HttpRequestMessage(HttpMethod.Get,
$"{_url}?token={_token}");
if (_lastEventId is not null)
request.Headers.TryAddWithoutValidation("Last-Event-ID", _lastEventId);
using var response = await _http.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, ct);
using var reader = new StreamReader(await response.Content.ReadAsStreamAsync(ct));
string? line;
while ((line = await reader.ReadLineAsync()) is not null)
{
if (line.StartsWith("id:")) _lastEventId = line[3..].Trim();
else if (line.StartsWith("data:"))
{
var data = JsonSerializer.Deserialize<object>(line[5..].Trim());
_messageBuffer.Add(data!);
// Process message...
}
}
}
}Scaling with Redis Pub/Sub
When deploying across multiple servers, WebSocket connections on Server A can’t directly message connections on Server B. Redis pub/sub acts as a message broker:
import redis from 'redis';
const redisClient = redis.createClient();
const redisSubscriber = redis.createClient();
await redisClient.connect();
await redisSubscriber.connect();
const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
ws.userId = userId;
if (!userConnections.has(userId)) {
userConnections.set(userId, new Set());
}
userConnections.get(userId)!.add(ws);
ws.on('message', async (data) => {
const message = JSON.parse(data.toString());
if (message.type === 'chat') {
// Publish to Redis so other servers can route to this user
await redisClient.publish(`user:${message.recipientId}:messages`, JSON.stringify({
from: userId,
text: message.text,
timestamp: new Date(),
}));
}
});
});
// Subscribe to user-specific channels
async function subscribeToUser(userId: string) {
await redisSubscriber.subscribe(`user:${userId}:messages`, (message) => {
const parsedMessage = JSON.parse(message);
const connections = userConnections.get(userId);
if (connections) {
connections.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'chat',
from: parsedMessage.from,
text: parsedMessage.text,
}));
}
});
}
});
}
wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
const userId = (req as any).userId;
subscribeToUser(userId);
});// Spring Boot WebSocket + Redis Pub/Sub
// pom.xml: spring-boot-starter-data-redis, spring-boot-starter-websocket
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RedisWebSocketBridge {
private final StringRedisTemplate redisTemplate;
private final SimpMessagingTemplate wsTemplate;
private final RedisMessageListenerContainer listenerContainer;
private final Map<String, MessageListener> listeners = new ConcurrentHashMap<>();
public RedisWebSocketBridge(
StringRedisTemplate redisTemplate,
SimpMessagingTemplate wsTemplate,
RedisMessageListenerContainer listenerContainer) {
this.redisTemplate = redisTemplate;
this.wsTemplate = wsTemplate;
this.listenerContainer = listenerContainer;
}
// Called when a user connects
public void subscribeToUser(String userId) {
String channel = "user:" + userId + ":messages";
MessageListener listener = (Message message, byte[] pattern) -> {
String payload = new String(message.getBody());
// Route Redis message to the user's WebSocket queue
wsTemplate.convertAndSendToUser(userId, "/queue/messages", payload);
};
listeners.put(userId, listener);
listenerContainer.addMessageListener(listener, new PatternTopic(channel));
}
// Called when a message should be sent to another user
public void publishToUser(String recipientId, String fromUserId, String text) {
String channel = "user:" + recipientId + ":messages";
String payload = String.format(
"{\"from\":\"%s\",\"text\":\"%s\",\"timestamp\":\"%s\"}",
fromUserId, text, java.time.Instant.now()
);
redisTemplate.convertAndSend(channel, payload);
}
// Called when a user disconnects
public void unsubscribeFromUser(String userId) {
MessageListener listener = listeners.remove(userId);
if (listener != null) {
listenerContainer.removeMessageListener(listener);
}
}
}# FastAPI WebSocket + Redis Pub/Sub
# pip install fastapi uvicorn redis websockets
import asyncio
import json
from collections import defaultdict
from typing import Dict, Set
import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")
redis_subscriber = aioredis.from_url("redis://localhost")
user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
user_id = validate_token(token) # implement JWT validation
if not user_id:
await websocket.close(code=4001)
return
await websocket.accept()
user_connections[user_id].add(websocket)
# Subscribe to this user's Redis channel
subscribe_task = asyncio.create_task(subscribe_to_user(user_id))
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "chat":
# Publish to Redis — any server can pick this up
await redis_client.publish(
f"user:{message['recipientId']}:messages",
json.dumps({
"from": user_id,
"text": message["text"],
"timestamp": asyncio.get_event_loop().time(),
}),
)
except WebSocketDisconnect:
user_connections[user_id].discard(websocket)
subscribe_task.cancel()
async def subscribe_to_user(user_id: str):
channel = f"user:{user_id}:messages"
async with redis_subscriber.pubsub() as pubsub:
await pubsub.subscribe(channel)
async for raw_message in pubsub.listen():
if raw_message["type"] != "message":
continue
payload = json.loads(raw_message["data"])
for ws in list(user_connections.get(user_id, [])):
try:
await ws.send_text(json.dumps({
"type": "chat",
"from": payload["from"],
"text": payload["text"],
}))
except Exception:
user_connections[user_id].discard(ws)
def validate_token(token: str):
return None # implement JWT validation// ASP.NET Core WebSocket + Redis Pub/Sub (.NET 8)
// NuGet: StackExchange.Redis, Microsoft.AspNetCore.WebSockets
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using StackExchange.Redis;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
var app = builder.Build();
app.UseWebSockets();
var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();
var redis = app.Services.GetRequiredService<IConnectionMultiplexer>();
var subscriber = redis.GetSubscriber();
app.Map("/ws", async (HttpContext context) =>
{
if (!context.WebSockets.IsWebSocketRequest) { context.Response.StatusCode = 400; return; }
var token = context.Request.Query["token"].ToString();
var userId = ValidateToken(token);
if (userId == null) { context.Response.StatusCode = 401; return; }
var ws = await context.WebSockets.AcceptWebSocketAsync();
userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);
// Subscribe to this user's Redis channel
await subscriber.SubscribeAsync($"user:{userId}:messages", async (channel, value) =>
{
var payload = JsonSerializer.Deserialize<JsonElement>(value!);
var outgoing = JsonSerializer.Serialize(new
{
type = "chat",
from = payload.GetProperty("from").GetString(),
text = payload.GetProperty("text").GetString(),
});
if (ws.State == WebSocketState.Open)
await ws.SendAsync(Encoding.UTF8.GetBytes(outgoing),
WebSocketMessageType.Text, true, CancellationToken.None);
});
var buffer = new byte[4096];
var db = redis.GetDatabase();
while (ws.State == WebSocketState.Open)
{
var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close) break;
var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
var message = JsonSerializer.Deserialize<JsonElement>(json);
if (message.GetProperty("type").GetString() == "chat")
{
var recipientId = message.GetProperty("recipientId").GetString()!;
var pubPayload = JsonSerializer.Serialize(new
{
from = userId,
text = message.GetProperty("text").GetString(),
timestamp = DateTime.UtcNow,
});
// Publish — any server subscribed to this channel will receive it
await db.PublishAsync($"user:{recipientId}:messages", pubPayload);
}
}
await subscriber.UnsubscribeAsync($"user:{userId}:messages");
});
static string? ValidateToken(string token) => null; // implement JWT validation
app.Run();Pattern Summary:
- Client sends message to Server A.
- Server A publishes to Redis channel
user:RECIPIENT:messages. - Server B (where recipient is connected) receives the message from Redis.
- Server B routes the message to the recipient’s WebSocket connection.
Backpressure and Flow Control
The Problem
If a client sends messages faster than the server processes them, the internal buffer grows, consuming memory. Similarly, if the client can’t receive messages (slow network), the server’s send buffer fills.
WebSocket Backpressure
ws.on('message', async (data) => {
const message = JSON.parse(data.toString());
// Process asynchronously; don't block the event loop
try {
await processMessage(message);
} catch (err) {
console.error('Error processing message:', err);
}
});
// Check write buffer before sending
function sendMessage(ws: WebSocket, data: any) {
const serialized = JSON.stringify(data);
if (ws.bufferedAmount > 16 * 1024) {
console.warn('Write buffer exceeded 16KB; apply backpressure');
// Option 1: Drop message (acceptable for real-time streams)
// Option 2: Queue and retry later
// Option 3: Pause reading from client
return false;
}
ws.send(serialized);
return true;
}// Spring WebSocket backpressure with bounded queue
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@Component
public class BackpressureWebSocketSender {
private static final int MAX_QUEUE_SIZE = 1000;
private static final int WARN_THRESHOLD = 800;
private final SimpMessagingTemplate messagingTemplate;
public BackpressureWebSocketSender(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// Per-user bounded queue to absorb bursts and apply backpressure
private final java.util.concurrent.ConcurrentHashMap<String, BlockingQueue<Object>>
userQueues = new java.util.concurrent.ConcurrentHashMap<>();
public boolean sendToUser(String userId, Object message) {
BlockingQueue<Object> queue = userQueues.computeIfAbsent(
userId, k -> new ArrayBlockingQueue<>(MAX_QUEUE_SIZE)
);
if (queue.size() >= WARN_THRESHOLD) {
System.out.println("Backpressure warning for user " + userId
+ ": queue=" + queue.size());
}
boolean accepted = queue.offer(message); // non-blocking; drops if full
if (!accepted) {
System.out.println("Write buffer exceeded for user " + userId + "; dropping message");
return false;
}
// Drain queue and forward to WebSocket
Object pending;
while ((pending = queue.poll()) != null) {
messagingTemplate.convertAndSendToUser(userId, "/queue/messages", pending);
}
return true;
}
}# FastAPI WebSocket backpressure with asyncio.Queue
import asyncio
import json
from typing import Dict
MAX_QUEUE_SIZE = 1000
WARN_THRESHOLD = 800
send_queues: Dict[str, asyncio.Queue] = {}
async def send_with_backpressure(user_id: str, ws, data: dict) -> bool:
if user_id not in send_queues:
send_queues[user_id] = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
queue = send_queues[user_id]
if queue.qsize() >= WARN_THRESHOLD:
print(f"Backpressure warning for user {user_id}: queue={queue.qsize()}")
try:
queue.put_nowait(data) # Raises QueueFull if at capacity
except asyncio.QueueFull:
print(f"Write buffer exceeded 16KB equivalent for user {user_id}; dropping message")
return False
# Drain queue
while not queue.empty():
item = queue.get_nowait()
try:
await ws.send_text(json.dumps(item))
except Exception:
return False
return True
async def process_message(message: dict):
# Simulate async processing
await asyncio.sleep(0)// ASP.NET Core WebSocket backpressure with Channel<T>
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
public class BackpressureWebSocketSender
{
private const int MaxQueueSize = 1000;
private const int WarnThreshold = 800;
private readonly ConcurrentDictionary<string, Channel<object>> _userChannels = new();
public bool Enqueue(string userId, object message)
{
var channel = _userChannels.GetOrAdd(userId, _ =>
Channel.CreateBounded<object>(new BoundedChannelOptions(MaxQueueSize)
{
FullMode = BoundedChannelFullMode.DropOldest, // backpressure: drop oldest
SingleWriter = false,
SingleReader = true,
}));
if (channel.Reader.Count >= WarnThreshold)
Console.WriteLine($"Backpressure warning for user {userId}: queue={channel.Reader.Count}");
if (!channel.Writer.TryWrite(message))
{
Console.WriteLine($"Write buffer exceeded for user {userId}; dropping message");
return false;
}
return true;
}
public async Task DrainToWebSocket(string userId, WebSocket ws, CancellationToken ct)
{
if (!_userChannels.TryGetValue(userId, out var channel)) return;
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
if (ws.State != WebSocketState.Open) break;
var json = JsonSerializer.Serialize(item);
await ws.SendAsync(Encoding.UTF8.GetBytes(json),
WebSocketMessageType.Text, true, ct);
}
}
}SSE Backpressure
function broadcastToUser(userId: string, data: any): boolean {
const clients = sseClients.get(userId);
if (!clients) return true;
let allSuccess = true;
clients.forEach((client) => {
const canWrite = client.res.write(`data: ${JSON.stringify(data)}\n\n`);
if (!canWrite) {
console.warn(`Backpressure detected for user ${userId}`);
allSuccess = false;
// Apply backpressure: pause reading, implement queue, etc.
}
});
return allSuccess;
}// Java SSE backpressure — check emitter completion before sending
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Set;
public boolean broadcastToUser(String userId, String eventType, Object data,
java.util.Map<String, Set<SseEmitter>> clients) {
Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
boolean allSuccess = true;
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name(eventType).data(data));
} catch (IOException e) {
System.out.println("Backpressure detected for user " + userId + ": " + e.getMessage());
allSuccess = false;
// Remove dead emitter
emitter.completeWithError(e);
}
}
return allSuccess;
}# Python SSE backpressure — bounded asyncio.Queue per client
import asyncio
import json
from typing import Dict, Set
MAX_SSE_QUEUE = 100
# Each client gets a bounded queue; if full, backpressure is signaled
sse_queues: Dict[str, Set[asyncio.Queue]] = {}
async def broadcast_to_user(user_id: str, data: dict) -> bool:
queues = sse_queues.get(user_id, set())
all_success = True
for queue in list(queues):
try:
queue.put_nowait({"event": "message", "data": json.dumps(data)})
except asyncio.QueueFull:
print(f"Backpressure detected for user {user_id}")
all_success = False
# Drop oldest item to make room (alternative: drop newest)
queue.get_nowait()
queue.put_nowait({"event": "message", "data": json.dumps(data)})
return all_success// C# SSE backpressure — Channel<T> per response
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
public class SseBackpressureSender
{
private readonly ConcurrentDictionary<string, Channel<string>> _userChannels = new();
public bool BroadcastToUser(string userId, object data)
{
if (!_userChannels.TryGetValue(userId, out var channel)) return true;
var payload = $"data: {JsonSerializer.Serialize(data)}\n\n";
if (!channel.Writer.TryWrite(payload))
{
Console.WriteLine($"Backpressure detected for user {userId}");
// Apply backpressure: pause reading, implement queue, etc.
return false;
}
return true;
}
public Channel<string> CreateChannelForUser(string userId)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // block writer until reader drains
});
_userChannels[userId] = channel;
return channel;
}
public void RemoveUser(string userId) => _userChannels.TryRemove(userId, out _);
}Comparison: WebSocket vs SSE vs Long Polling
| Feature | WebSocket | SSE | Long Polling |
|---|---|---|---|
| Bidirectional | Yes | No (separate HTTP POST) | No (separate HTTP POST) |
| Latency | Low (< 50ms) | Low (~100ms) | High (1-10s) |
| Connection Overhead | Minimal (2-10 bytes/frame) | Minimal (text-based) | High (new HTTP headers per request) |
| Server Push | Native | Native | Simulated (client polls) |
| Browser Support | Modern browsers | Modern browsers | All browsers |
| Proxy Compatibility | Good (needs WebSocket-aware proxies) | Excellent (standard HTTP) | Excellent (standard HTTP) |
| Scalability | Medium (persistent connections) | Medium (persistent connections) | Easy (stateless) |
| Use Case | Chat, gaming, real-time dashboards | Notifications, log streaming | Legacy systems, strict proxies |
Production Checklist
Connection Management
-
Connection Limits: Set per-server and per-user limits to prevent resource exhaustion.
const MAX_CONNECTIONS_PER_SERVER = 10000; const MAX_CONNECTIONS_PER_USER = 5; if (userConnections.get(userId)?.size >= MAX_CONNECTIONS_PER_USER) { ws.close(4029, 'Too many connections'); return; } -
Timeout Handling: Close idle connections.
const CONNECTION_IDLE_TIMEOUT = 5 * 60 * 1000; // 5 minutes ws.on('message', () => { clearTimeout(ws.idleTimer); ws.idleTimer = setTimeout(() => ws.close(4000, 'Idle timeout'), CONNECTION_IDLE_TIMEOUT); });
Load Balancer Configuration
- Sticky Sessions: Route a user’s requests to the same server to ensure WebSocket upgrade success.
# Nginx example upstream backend { least_conn; server backend1; server backend2; server backend3; } server { location /ws { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 86400; # Long timeout for persistent connections } }
Memory Management
-
Monitor Memory Usage: Use
process.memoryUsage()and set up alerts.setInterval(() => { const mem = process.memoryUsage(); console.log(`Memory: ${(mem.heapUsed / 1024 / 1024).toFixed(2)}MB / ${(mem.heapTotal / 1024 / 1024).toFixed(2)}MB`); if (mem.heapUsed > 1024 * 1024 * 1024) { // 1GB console.error('Heap usage critical; closing new connections'); // Reject new connections, gracefully close old ones } }, 10000); -
Message Queue Limits: Prevent unbounded growth of pending messages.
const MAX_MESSAGE_QUEUE = 1000; if (messageQueue.length > MAX_MESSAGE_QUEUE) { messageQueue.shift(); // Drop oldest }
Monitoring and Logging
-
Connection Metrics:
- Active WebSocket connections
- Connections by user/server
- Message throughput (msg/sec)
- Latency percentiles (p50, p95, p99)
- Error rates and close codes
-
Structured Logging:
logger.info('ws:connected', { userId, remoteAddress: req.socket.remoteAddress, timestamp: new Date() }); logger.error('ws:error', { userId, error: err.message, code: err.code }); logger.warn('ws:backpressure', { userId, bufferedAmount: ws.bufferedAmount });
Graceful Shutdown
function gracefulShutdown() {
console.log('Shutting down gracefully...');
// Stop accepting new connections
server.close();
// Close all WebSocket connections
wss.clients.forEach((ws) => {
ws.close(1001, 'Server shutting down');
});
// Wait for graceful closure
setTimeout(() => {
console.log('Force shutting down');
process.exit(1);
}, 30000); // 30 seconds
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);// Spring Boot graceful shutdown
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class GracefulWebSocketShutdown {
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final SimpMessagingTemplate messagingTemplate;
public GracefulWebSocketShutdown(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void registerSession(String sessionId, WebSocketSession session) {
sessions.put(sessionId, session);
}
@EventListener(ContextClosedEvent.class)
public void onShutdown() {
System.out.println("Shutting down gracefully...");
// Notify all connected clients
messagingTemplate.convertAndSend("/topic/shutdown",
java.util.Map.of("reason", "Server shutting down"));
// Close all sessions
for (WebSocketSession session : sessions.values()) {
try {
session.close(CloseStatus.SERVICE_RESTARTED);
} catch (IOException e) {
System.err.println("Error closing session: " + e.getMessage());
}
}
System.out.println("All WebSocket sessions closed");
}
}# FastAPI graceful shutdown
import asyncio
import json
import signal
from typing import Dict, Set
from fastapi import FastAPI, WebSocket
app = FastAPI()
active_websockets: Set[WebSocket] = set()
async def graceful_shutdown():
print("Shutting down gracefully...")
# Notify all clients
close_tasks = []
for ws in list(active_websockets):
try:
await ws.send_text(json.dumps({"type": "server_shutdown"}))
close_tasks.append(ws.close(1001))
except Exception:
pass
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
print("All WebSocket connections closed")
@app.on_event("shutdown")
async def shutdown_event():
await graceful_shutdown()
# uvicorn handles SIGTERM/SIGINT and calls the shutdown event// ASP.NET Core graceful shutdown (.NET 8)
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
public class GracefulShutdownService : IHostedService
{
public static readonly ConcurrentBag<WebSocket> ActiveSockets = new();
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public async Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Shutting down gracefully...");
var shutdownMsg = JsonSerializer.Serialize(new { type = "server_shutdown" });
var bytes = Encoding.UTF8.GetBytes(shutdownMsg);
var tasks = ActiveSockets
.Where(ws => ws.State == WebSocketState.Open)
.Select(async ws =>
{
try
{
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
await ws.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
"Server shutting down", CancellationToken.None);
}
catch { /* ignore — client may have already disconnected */ }
});
// Wait up to 30 seconds for graceful closure
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(30));
Console.WriteLine("All WebSocket connections closed");
}
}
// Register in Program.cs:
// builder.Services.AddHostedService<GracefulShutdownService>();Testing
- Connection Stability: Simulate network conditions (latency, packet loss, disconnects).
- Load Testing: Test with thousands of concurrent connections.
- Protocol Compliance: Verify frame structure, opcodes, and close codes.
Conclusion
WebSocket and SSE are powerful tools for real-time communication. WebSocket excels for bidirectional, low-latency scenarios (chat, gaming, collaborative editing). SSE shines for simple server-push use cases (notifications, log streaming) with better proxy compatibility.
Scaling persistent connections requires coordination via message brokers, careful memory management, and production-grade monitoring. With proper architecture—heartbeats, reconnection strategies, load balancing, and graceful shutdown—both protocols support millions of concurrent users.
Choose WebSocket for full bidirectionality and latency-critical apps; choose SSE for simplicity and proxy compatibility. And always measure, monitor, and test under realistic load.