All articles
Architecture Backend WebSocket Real-time

WebSocket and SSE Communication Server-to-Server Communication Technologies

Palakorn Voramongkol
April 20, 2025 13 min read

“A comprehensive guide to WebSocket and Server-Sent Events for real-time communication — covering the WebSocket protocol, SSE, scaling persistent connections, heartbeats, reconnection, and production patterns.”

Deep Dive: WebSocket and SSE Communication

Real-time communication is foundational for modern applications. Whether you’re building chat applications, live notifications, collaborative editing tools, or trading dashboards, choosing the right protocol matters. This guide explores WebSocket and Server-Sent Events (SSE)—two powerful mechanisms for persistent, bidirectional, or server-initiated communication—and covers production patterns for scaling and reliability.

What are WebSocket and SSE?

Traditional HTTP follows a request-response model: the client initiates, the server responds, the connection closes. This pattern doesn’t fit real-time scenarios where the server needs to push updates independently to clients.

WebSocket and SSE solve this problem through persistent connections, but they take different approaches:

  • WebSocket: Upgrades an HTTP connection to a full-duplex, bidirectional protocol. Once established, either client or server can send messages at any time over the same TCP connection. Built on TCP, it provides low latency and supports binary frames.

  • Server-Sent Events (SSE): Uses HTTP as the transport and establishes a one-way channel where the server pushes text-based events to the client. The client can still send messages via separate HTTP requests. Simpler, uses standard HTTP infrastructure, but unidirectional.

The sequence diagram below illustrates a typical WebSocket flow:

sequenceDiagram
    participant Client
    participant Server
    
    Note over Client,Server: WebSocket Upgrade Handshake
    Client->>Server: HTTP Upgrade Request (Connection, Upgrade headers)
    Server->>Client: HTTP 101 Switching Protocols
    
    Note over Client,Server: Persistent Connection Established
    Server->>Client: Push Update (Frame)
    Client->>Server: Client Message (Frame)
    Server->>Client: Push Update (Frame)
    Client->>Server: Ping
    Server->>Client: Pong
    
    Note over Client,Server: Connection Close
    Client->>Server: Close Frame
    Server->>Client: Close Frame (Acknowledgment)

Core Principles

  • Persistent Connections: Both WebSocket and SSE maintain open TCP connections, eliminating the overhead of repeated HTTP handshakes.
  • Low Latency: Messages flow immediately without polling delays or request round-trips.
  • Resource Efficiency: A single connection handles all bidirectional (WebSocket) or server-initiated (SSE) messages.
  • Scalability Challenges: Persistent connections consume server resources; scaling horizontally requires coordination via message brokers (Redis pub/sub).
  • Protocol Overhead: WebSocket frames add minimal overhead (2-10 bytes per frame). SSE uses HTTP semantics, slightly heavier but compatible with proxies.
  • Browser Support: Both are well-supported in modern browsers; WebSocket requires explicit protocol negotiation, SSE uses EventSource API.

The WebSocket Protocol

Upgrade Handshake

WebSocket begins with an HTTP upgrade. The client sends an HTTP GET request with specific headers:

GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13

The server responds with HTTP 101:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=

The Sec-WebSocket-Key is a base64-encoded random nonce; the server computes Sec-WebSocket-Accept by concatenating the key with a magic string (258EAFA5-E910-5DAB-8B74-38787C8F4C7E), SHA-1 hashing, and base64 encoding. This prevents cache poisoning and confirms protocol compliance.

WebSocket Frames

Once upgraded, data flows as frames. Each frame has:

  • FIN bit (1 bit): Marks the last fragment of a message.
  • Opcode (4 bits): Defines frame type (see below).
  • Mask bit (1 bit): Client-to-server frames are masked for security; server-to-client frames are not.
  • Payload length (7, 21, or 63 bits): Message size.
  • Masking key (32 bits, if masked): XOR key for payload encryption.
  • Payload data: The actual message.

Opcodes

OpcodeNamePurpose
0x0ContinuationContinuation of a fragmented message
0x1TextUTF-8 text message
0x2BinaryBinary data
0x8CloseInitiate connection close
0x9PingRequest a pong (connection health check)
0xAPongRespond to a ping

Implementing WebSocket with Node.js

WebSocket Server (ws library)

The ws library is a lightweight, production-ready WebSocket server implementation.

import WebSocket, { Server as WebSocketServer } from 'ws';
import http from 'http';
import { verifyToken } from './auth';

const server = http.createServer();
const wss = new WebSocketServer({ server });

interface AuthenticatedWebSocket extends WebSocket {
  userId?: string;
  isAlive?: boolean;
}

// Track active connections per user
const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();

// Middleware-style authentication
function verifyClient(
  info: { origin: string; secure: boolean; req: http.IncomingMessage },
  callback: (success: boolean, code?: number, message?: string) => void,
) {
  const url = new URL(info.req.url || '', 'http://localhost');
  const token = url.searchParams.get('token');

  if (!token) {
    callback(false, 4001, 'Token required');
    return;
  }

  try {
    const decoded = verifyToken(token);
    (info.req as any).userId = decoded.sub;
    callback(true);
  } catch (err) {
    callback(false, 4001, 'Invalid token');
  }
}

const wss = new WebSocketServer({
  server,
  verifyClient,
  clientTracking: false, // Manage connections manually for multi-server setups
});

wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
  const userId = (req as any).userId;
  ws.userId = userId;
  ws.isAlive = true;

  // Track connection
  if (!userConnections.has(userId)) {
    userConnections.set(userId, new Set());
  }
  userConnections.get(userId)!.add(ws);

  console.log(`User ${userId} connected`);

  // Heartbeat
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // Handle incoming messages
  ws.on('message', async (data: WebSocket.Data) => {
    try {
      const message = JSON.parse(data.toString());

      switch (message.type) {
        case 'chat':
          broadcastToUser(message.recipientId, {
            type: 'chat',
            from: userId,
            text: message.text,
            timestamp: new Date(),
          });
          break;

        case 'presence':
          broadcastToUser(userId, {
            type: 'user_status',
            userId,
            status: message.status,
          });
          break;

        default:
          ws.send(JSON.stringify({ error: 'Unknown message type' }));
      }
    } catch (err) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  // Handle errors
  ws.on('error', (err) => {
    console.error(`WebSocket error for user ${userId}:`, err);
  });

  // Handle close
  ws.on('close', (code, reason) => {
    const connections = userConnections.get(userId);
    if (connections) {
      connections.delete(ws);
      if (connections.size === 0) {
        userConnections.delete(userId);
      }
    }
    console.log(`User ${userId} disconnected (code: ${code})`);
  });
});

// Heartbeat interval (detects dead connections)
setInterval(() => {
  wss.clients.forEach((ws: AuthenticatedWebSocket) => {
    if (ws.isAlive === false) {
      ws.terminate();
      return;
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

function broadcastToUser(userId: string, message: any) {
  const connections = userConnections.get(userId);
  if (connections) {
    connections.forEach((ws) => {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    });
  }
}

server.listen(3000, () => {
  console.log('WebSocket server listening on port 3000');
});
// Spring Boot WebSocket Server with STOMP
// pom.xml: spring-boot-starter-websocket, jjwt

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.config.annotation.*;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }
}

@Controller
@EnableScheduling
public class WebSocketController {

    private final SimpMessagingTemplate messagingTemplate;
    private final Map<String, Set<String>> userSessions = new ConcurrentHashMap<>();

    public WebSocketController(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    @MessageMapping("/chat")
    public void handleChat(@Payload ChatMessage message, Principal principal) {
        String fromUserId = principal.getName();
        ChatMessage outgoing = new ChatMessage(
            "chat", fromUserId, message.getText(), new Date()
        );
        // Route to recipient's personal queue
        messagingTemplate.convertAndSendToUser(
            message.getRecipientId(), "/queue/messages", outgoing
        );
    }

    @MessageMapping("/presence")
    public void handlePresence(@Payload PresenceMessage message, Principal principal) {
        String userId = principal.getName();
        messagingTemplate.convertAndSendToUser(
            userId, "/queue/status",
            new StatusMessage(userId, message.getStatus())
        );
    }

    // Heartbeat: Spring STOMP handles ping/pong at the protocol level
    @Scheduled(fixedDelay = 30000)
    public void sendHeartbeat() {
        messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("type", "ping"));
    }

    public record ChatMessage(String type, String from, String text, Date timestamp) {
        public String getRecipientId() { return ""; } // set via payload
        public String getText() { return text; }
        public String getRecipientId(String s) { return s; }
    }
    public record PresenceMessage(String status) {
        public String getStatus() { return status; }
    }
    public record StatusMessage(String userId, String status) {}
}
# FastAPI WebSocket Server
# pip install fastapi uvicorn websockets python-jose

import asyncio
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, Set

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, status
from jose import jwt, JWTError

app = FastAPI()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

# Track active connections per user
user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)


def verify_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
    # Authenticate before accepting
    try:
        payload = verify_token(token)
        user_id = payload["sub"]
    except JWTError:
        await websocket.close(code=4001)
        return

    await websocket.accept()
    user_connections[user_id].add(websocket)
    print(f"User {user_id} connected")

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            if message["type"] == "chat":
                await broadcast_to_user(
                    message["recipientId"],
                    {
                        "type": "chat",
                        "from": user_id,
                        "text": message["text"],
                        "timestamp": datetime.utcnow().isoformat(),
                    },
                )
            elif message["type"] == "presence":
                await broadcast_to_user(
                    user_id,
                    {"type": "user_status", "userId": user_id, "status": message["status"]},
                )
            else:
                await websocket.send_text(json.dumps({"error": "Unknown message type"}))

    except WebSocketDisconnect:
        user_connections[user_id].discard(websocket)
        if not user_connections[user_id]:
            del user_connections[user_id]
        print(f"User {user_id} disconnected")
    except Exception as e:
        print(f"WebSocket error for user {user_id}: {e}")
        user_connections[user_id].discard(websocket)


async def broadcast_to_user(user_id: str, message: dict):
    connections = user_connections.get(user_id, set())
    dead = set()
    for ws in connections:
        try:
            await ws.send_text(json.dumps(message))
        except Exception:
            dead.add(ws)
    connections -= dead


# Heartbeat task
async def heartbeat_task():
    while True:
        await asyncio.sleep(30)
        for user_id, connections in list(user_connections.items()):
            dead = set()
            for ws in connections:
                try:
                    await ws.send_text(json.dumps({"type": "ping"}))
                except Exception:
                    dead.add(ws)
            connections -= dead


@app.on_event("startup")
async def startup():
    asyncio.create_task(heartbeat_task())
// ASP.NET Core WebSocket Server
// No extra packages needed — built-in WebSocket support

using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromSeconds(30) });

// Track connections per user
var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();

app.Map("/ws", async (HttpContext context) =>
{
    if (!context.WebSockets.IsWebSocketRequest)
    {
        context.Response.StatusCode = 400;
        return;
    }

    var token = context.Request.Query["token"].ToString();
    var userId = ValidateToken(token); // implement your JWT validation
    if (userId == null)
    {
        context.Response.StatusCode = 401;
        return;
    }

    var ws = await context.WebSockets.AcceptWebSocketAsync();
    userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);
    Console.WriteLine($"User {userId} connected");

    var buffer = new byte[4096];
    try
    {
        while (ws.State == WebSocketState.Open)
        {
            var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
            if (result.MessageType == WebSocketMessageType.Close)
                break;

            var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
            var message = JsonSerializer.Deserialize<JsonElement>(json);
            var type = message.GetProperty("type").GetString();

            if (type == "chat")
            {
                var recipientId = message.GetProperty("recipientId").GetString()!;
                var outgoing = JsonSerializer.Serialize(new
                {
                    type = "chat",
                    from = userId,
                    text = message.GetProperty("text").GetString(),
                    timestamp = DateTime.UtcNow,
                });
                await BroadcastToUser(userConnections, recipientId, outgoing);
            }
        }
    }
    finally
    {
        var bag = userConnections.GetValueOrDefault(userId);
        // ConcurrentBag doesn't support remove; swap to a cleaner structure in production
        Console.WriteLine($"User {userId} disconnected");
        await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Bye", CancellationToken.None);
    }
});

static async Task BroadcastToUser(
    ConcurrentDictionary<string, ConcurrentBag<WebSocket>> connections,
    string userId, string message)
{
    if (!connections.TryGetValue(userId, out var sockets)) return;
    var bytes = Encoding.UTF8.GetBytes(message);
    foreach (var ws in sockets)
    {
        if (ws.State == WebSocketState.Open)
            await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
    }
}

static string? ValidateToken(string token)
{
    // Implement JWT validation; return userId or null
    return null;
}

app.Run();

WebSocket Client (Browser)

class WebSocketClient {
  private ws: WebSocket | null = null;
  private url: string;
  private token: string;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;

  constructor(url: string, token: string) {
    this.url = url;
    this.token = token;
  }

  connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      try {
        this.ws = new WebSocket(
          `${this.url}?token=${encodeURIComponent(this.token)}`,
        );

        this.ws.onopen = () => {
          console.log('Connected');
          this.reconnectAttempts = 0;
          resolve();
        };

        this.ws.onmessage = (event) => {
          const message = JSON.parse(event.data);
          this.handleMessage(message);
        };

        this.ws.onerror = (event) => {
          console.error('WebSocket error:', event);
          reject(new Error('Connection failed'));
        };

        this.ws.onclose = () => {
          console.log('Disconnected');
          this.attemptReconnect();
        };
      } catch (err) {
        reject(err);
      }
    });
  }

  send(message: any) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      console.warn('WebSocket not connected');
    }
  }

  private attemptReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
      console.log(`Reconnecting in ${delay}ms...`);
      setTimeout(() => this.connect().catch(console.error), delay);
    }
  }

  private handleMessage(message: any) {
    switch (message.type) {
      case 'chat':
        console.log(`Message from ${message.from}: ${message.text}`);
        break;
      default:
        console.log('Received:', message);
    }
  }

  disconnect() {
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// Usage
const client = new WebSocketClient(
  'ws://localhost:3000/ws',
  'your-jwt-token',
);
await client.connect();
client.send({ type: 'chat', recipientId: 'user123', text: 'Hello!' });
// Java WebSocket Client using Jakarta WebSocket API
// Maven: jakarta.websocket-api, tyrus-standalone-client

import jakarta.websocket.*;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ClientEndpoint
public class WebSocketClient {

    private Session session;
    private final String url;
    private final String token;
    private int reconnectAttempts = 0;
    private final int maxReconnectAttempts = 5;
    private final long reconnectDelayMs = 1000;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public WebSocketClient(String url, String token) {
        this.url = url;
        this.token = token;
    }

    public CompletableFuture<Void> connect() {
        var future = new CompletableFuture<Void>();
        try {
            var container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(this, URI.create(url + "?token=" + token));
            future.complete(null);
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.reconnectAttempts = 0;
        System.out.println("Connected");
    }

    @OnMessage
    public void onMessage(String messageJson) {
        // Parse and handle message
        System.out.println("Received: " + messageJson);
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        System.out.println("Disconnected: " + reason);
        attemptReconnect();
    }

    @OnError
    public void onError(Session session, Throwable error) {
        System.err.println("WebSocket error: " + error.getMessage());
    }

    public void send(String messageJson) {
        if (session != null && session.isOpen()) {
            session.getAsyncRemote().sendText(messageJson);
        } else {
            System.out.println("WebSocket not connected");
        }
    }

    private void attemptReconnect() {
        if (reconnectAttempts < maxReconnectAttempts) {
            long delay = reconnectDelayMs * (1L << reconnectAttempts);
            reconnectAttempts++;
            System.out.println("Reconnecting in " + delay + "ms...");
            scheduler.schedule(() -> connect(), delay, TimeUnit.MILLISECONDS);
        }
    }

    public void disconnect() {
        if (session != null && session.isOpen()) {
            try { session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Normal closure")); }
            catch (Exception ignored) {}
        }
    }
}

// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
client.connect().join();
client.send("{\"type\":\"chat\",\"recipientId\":\"user123\",\"text\":\"Hello!\"}");
# Python WebSocket Client
# pip install websockets

import asyncio
import json
import math
from urllib.parse import quote

import websockets
from websockets.exceptions import ConnectionClosed


class WebSocketClient:
    def __init__(self, url: str, token: str):
        self.url = url
        self.token = token
        self.ws = None
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
        self.reconnect_delay = 1.0

    async def connect(self):
        uri = f"{self.url}?token={quote(self.token)}"
        self.ws = await websockets.connect(uri)
        self.reconnect_attempts = 0
        print("Connected")
        asyncio.create_task(self._receive_loop())

    async def _receive_loop(self):
        try:
            async for raw in self.ws:
                message = json.loads(raw)
                self._handle_message(message)
        except ConnectionClosed:
            print("Disconnected")
            await self._attempt_reconnect()

    def _handle_message(self, message: dict):
        if message.get("type") == "chat":
            print(f"Message from {message['from']}: {message['text']}")
        else:
            print("Received:", message)

    async def send(self, message: dict):
        if self.ws and not self.ws.closed:
            await self.ws.send(json.dumps(message))
        else:
            print("WebSocket not connected")

    async def _attempt_reconnect(self):
        if self.reconnect_attempts < self.max_reconnect_attempts:
            delay = self.reconnect_delay * (2 ** self.reconnect_attempts)
            self.reconnect_attempts += 1
            print(f"Reconnecting in {delay:.0f}ms...")
            await asyncio.sleep(delay / 1000)
            await self.connect()

    async def disconnect(self):
        if self.ws:
            await self.ws.close(1000, "Normal closure")


# Usage
async def main():
    client = WebSocketClient("ws://localhost:3000/ws", "your-jwt-token")
    await client.connect()
    await client.send({"type": "chat", "recipientId": "user123", "text": "Hello!"})
    await asyncio.sleep(5)
    await client.disconnect()

asyncio.run(main())
// C# WebSocket Client (.NET 8)
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

public class WebSocketClient
{
    private ClientWebSocket? _ws;
    private readonly string _url;
    private readonly string _token;
    private int _reconnectAttempts = 0;
    private const int MaxReconnectAttempts = 5;
    private const int ReconnectDelayMs = 1000;

    public WebSocketClient(string url, string token)
    {
        _url = url;
        _token = token;
    }

    public async Task ConnectAsync()
    {
        _ws = new ClientWebSocket();
        await _ws.ConnectAsync(new Uri($"{_url}?token={Uri.EscapeDataString(_token)}"), CancellationToken.None);
        _reconnectAttempts = 0;
        Console.WriteLine("Connected");
        _ = ReceiveLoopAsync();
    }

    private async Task ReceiveLoopAsync()
    {
        var buffer = new byte[4096];
        try
        {
            while (_ws!.State == WebSocketState.Open)
            {
                var result = await _ws.ReceiveAsync(buffer, CancellationToken.None);
                if (result.MessageType == WebSocketMessageType.Close) break;

                var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
                var message = JsonSerializer.Deserialize<JsonElement>(json);
                HandleMessage(message);
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine($"WebSocket error: {ex.Message}");
        }
        Console.WriteLine("Disconnected");
        await AttemptReconnectAsync();
    }

    private void HandleMessage(JsonElement message)
    {
        var type = message.GetProperty("type").GetString();
        if (type == "chat")
            Console.WriteLine($"Message from {message.GetProperty("from").GetString()}: {message.GetProperty("text").GetString()}");
        else
            Console.WriteLine($"Received: {message}");
    }

    public async Task SendAsync(object message)
    {
        if (_ws?.State == WebSocketState.Open)
        {
            var json = JsonSerializer.Serialize(message);
            await _ws.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, CancellationToken.None);
        }
        else
        {
            Console.WriteLine("WebSocket not connected");
        }
    }

    private async Task AttemptReconnectAsync()
    {
        if (_reconnectAttempts < MaxReconnectAttempts)
        {
            var delay = ReconnectDelayMs * (int)Math.Pow(2, _reconnectAttempts);
            _reconnectAttempts++;
            Console.WriteLine($"Reconnecting in {delay}ms...");
            await Task.Delay(delay);
            await ConnectAsync();
        }
    }

    public async Task DisconnectAsync()
    {
        if (_ws != null)
            await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal closure", CancellationToken.None);
    }
}

// Usage
var client = new WebSocketClient("ws://localhost:3000/ws", "your-jwt-token");
await client.ConnectAsync();
await client.SendAsync(new { type = "chat", recipientId = "user123", text = "Hello!" });

Server-Sent Events (SSE)

EventSource API

SSE uses the browser’s native EventSource API. The server sends HTTP response headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

Then sends data as newline-delimited events:

event: message
data: {"type":"chat","from":"user1","text":"Hello"}

event: notification
data: New notification arrived

: This is a comment (ignored by client)

data: {"action":"update","timestamp":"2025-04-20T10:30:00Z"}
retry: 5000

Fields:

  • event: Custom event type (JavaScript addEventListener will fire this event).
  • data: Payload (can be JSON or plain text).
  • id: Last-event-id for reconnection (server sends what client should retry from).
  • retry: Milliseconds before browser automatically reconnects.
  • Comments: Lines starting with : are ignored.

Implementing SSE with Express

import express from 'express';
import { verifyToken } from './auth';

const app = express();

interface SSEClient {
  userId: string;
  res: express.Response;
  lastEventId?: string;
}

const sseClients = new Map<string, Set<SSEClient>>();

// SSE endpoint
app.get('/events', (req, res) => {
  const token = req.query.token as string;

  if (!token) {
    res.status(401).json({ error: 'Token required' });
    return;
  }

  try {
    const decoded = verifyToken(token);
    const userId = decoded.sub;

    // Set SSE headers
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.setHeader('Access-Control-Allow-Origin', '*');

    // Support reconnection with Last-Event-ID
    const lastEventId = req.headers['last-event-id'] as string | undefined;

    const client: SSEClient = { userId, res, lastEventId };

    // Track client
    if (!sseClients.has(userId)) {
      sseClients.set(userId, new Set());
    }
    sseClients.get(userId)!.add(client);

    console.log(`SSE client connected: ${userId}`);

    // Send initial heartbeat
    res.write(': connected\n\n');

    // Handle client disconnect
    res.on('close', () => {
      const clients = sseClients.get(userId);
      if (clients) {
        clients.delete(client);
        if (clients.size === 0) {
          sseClients.delete(userId);
        }
      }
      console.log(`SSE client disconnected: ${userId}`);
    });

    res.on('error', (err) => {
      console.error(`SSE error for user ${userId}:`, err);
      res.end();
    });
  } catch (err) {
    res.status(401).json({ error: 'Invalid token' });
  }
});

// Broadcast function
function broadcastEventToUser(userId: string, eventType: string, data: any, eventId?: string) {
  const clients = sseClients.get(userId);
  if (clients) {
    clients.forEach((client) => {
      if (!client.res.closed) {
        client.res.write(`event: ${eventType}\n`);
        if (eventId) {
          client.res.write(`id: ${eventId}\n`);
        }
        client.res.write(`data: ${JSON.stringify(data)}\n\n`);
      }
    });
  }
}

// Example: Broadcast a notification
app.post('/notify', (req, res) => {
  const { userId, message } = req.body;
  broadcastEventToUser(userId, 'notification', message);
  res.json({ sent: true });
});

// Heartbeat to prevent proxy timeout
setInterval(() => {
  sseClients.forEach((clients) => {
    clients.forEach((client) => {
      if (!client.res.closed) {
        client.res.write(': heartbeat\n\n');
      }
    });
  });
}, 30000);

app.listen(3000, () => {
  console.log('SSE server on port 3000');
});
// Spring Boot SSE Server using SseEmitter
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@RestController
@RequestMapping
public class SseController {

    private final SseService sseService;

    public SseController(SseService sseService) {
        this.sseService = sseService;
    }

    @GetMapping("/events")
    public SseEmitter subscribe(@RequestParam String token) {
        String userId = validateToken(token); // implement JWT validation
        if (userId == null) throw new SecurityException("Invalid token");

        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        sseService.addClient(userId, emitter);

        emitter.onCompletion(() -> sseService.removeClient(userId, emitter));
        emitter.onTimeout(() -> sseService.removeClient(userId, emitter));
        emitter.onError(e -> sseService.removeClient(userId, emitter));

        try {
            emitter.send(SseEmitter.event().comment("connected"));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }

        return emitter;
    }

    @PostMapping("/notify")
    public Map<String, Boolean> notify(@RequestBody Map<String, Object> body) {
        String userId = (String) body.get("userId");
        Object message = body.get("message");
        sseService.broadcastToUser(userId, "notification", message);
        return Map.of("sent", true);
    }

    private String validateToken(String token) { return null; /* implement */ }
}

@Service
class SseService {

    private final Map<String, Set<SseEmitter>> clients = new ConcurrentHashMap<>();

    public void addClient(String userId, SseEmitter emitter) {
        clients.computeIfAbsent(userId, k -> Collections.synchronizedSet(new HashSet<>())).add(emitter);
        System.out.println("SSE client connected: " + userId);
    }

    public void removeClient(String userId, SseEmitter emitter) {
        Set<SseEmitter> set = clients.get(userId);
        if (set != null) {
            set.remove(emitter);
            if (set.isEmpty()) clients.remove(userId);
        }
        System.out.println("SSE client disconnected: " + userId);
    }

    public void broadcastToUser(String userId, String eventType, Object data) {
        Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
        Set<SseEmitter> dead = new HashSet<>();
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event().name(eventType).data(data));
            } catch (IOException e) {
                dead.add(emitter);
            }
        }
        dead.forEach(e -> removeClient(userId, e));
    }

    @Scheduled(fixedDelay = 30000)
    public void sendHeartbeat() {
        clients.forEach((userId, emitters) -> {
            Set<SseEmitter> dead = new HashSet<>();
            for (SseEmitter emitter : emitters) {
                try {
                    emitter.send(SseEmitter.event().comment("heartbeat"));
                } catch (IOException e) {
                    dead.add(emitter);
                }
            }
            dead.forEach(e -> removeClient(userId, e));
        });
    }
}
# FastAPI SSE Server
# pip install fastapi uvicorn sse-starlette

import asyncio
import json
from collections import defaultdict
from typing import AsyncGenerator, Dict, Set

from fastapi import FastAPI, Query, Request
from fastapi.responses import JSONResponse
from jose import jwt, JWTError
from sse_starlette.sse import EventSourceResponse

app = FastAPI()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

sse_clients: Dict[str, Set[asyncio.Queue]] = defaultdict(set)


def verify_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])


@app.get("/events")
async def sse_endpoint(request: Request, token: str = Query(...)):
    try:
        payload = verify_token(token)
        user_id = payload["sub"]
    except JWTError:
        return JSONResponse({"error": "Invalid token"}, status_code=401)

    queue: asyncio.Queue = asyncio.Queue()
    sse_clients[user_id].add(queue)
    print(f"SSE client connected: {user_id}")

    async def event_generator() -> AsyncGenerator:
        yield {"event": "connected", "data": ""}
        try:
            while True:
                if await request.is_disconnected():
                    break
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=30)
                    yield event
                except asyncio.TimeoutError:
                    # Heartbeat
                    yield {"comment": "heartbeat"}
        finally:
            sse_clients[user_id].discard(queue)
            if not sse_clients[user_id]:
                del sse_clients[user_id]
            print(f"SSE client disconnected: {user_id}")

    return EventSourceResponse(event_generator())


@app.post("/notify")
async def notify(body: dict):
    user_id = body["userId"]
    message = body["message"]
    await broadcast_to_user(user_id, "notification", message)
    return {"sent": True}


async def broadcast_to_user(user_id: str, event_type: str, data, event_id: str = None):
    queues = list(sse_clients.get(user_id, []))
    event = {"event": event_type, "data": json.dumps(data)}
    if event_id:
        event["id"] = event_id
    for queue in queues:
        await queue.put(event)
// ASP.NET Core SSE Server (.NET 8)
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

var sseClients = new ConcurrentDictionary<string, ConcurrentBag<HttpResponse>>();

app.MapGet("/events", async (HttpContext context, string token) =>
{
    var userId = ValidateToken(token);
    if (userId == null) { context.Response.StatusCode = 401; return; }

    context.Response.Headers["Content-Type"] = "text/event-stream";
    context.Response.Headers["Cache-Control"] = "no-cache";
    context.Response.Headers["Connection"] = "keep-alive";
    context.Response.Headers["Access-Control-Allow-Origin"] = "*";

    var res = context.Response;
    sseClients.GetOrAdd(userId, _ => new ConcurrentBag<HttpResponse>()).Add(res);
    Console.WriteLine($"SSE client connected: {userId}");

    await res.WriteAsync(": connected\n\n");
    await res.Body.FlushAsync();

    try
    {
        // Keep connection open until client disconnects
        var tcs = new TaskCompletionSource();
        context.RequestAborted.Register(() => tcs.TrySetResult());
        await tcs.Task;
    }
    finally
    {
        Console.WriteLine($"SSE client disconnected: {userId}");
    }
});

app.MapPost("/notify", async (HttpContext context) =>
{
    var body = await JsonSerializer.DeserializeAsync<JsonElement>(context.Request.Body);
    var userId = body.GetProperty("userId").GetString()!;
    var message = body.GetProperty("message");
    await BroadcastToUser(sseClients, userId, "notification", message.ToString());
    await context.Response.WriteAsJsonAsync(new { sent = true });
});

// Heartbeat background service
_ = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(30000);
        foreach (var (userId, responses) in sseClients)
        {
            foreach (var res in responses)
            {
                try { await res.WriteAsync(": heartbeat\n\n"); await res.Body.FlushAsync(); }
                catch { /* client disconnected */ }
            }
        }
    }
});

static async Task BroadcastToUser(
    ConcurrentDictionary<string, ConcurrentBag<HttpResponse>> clients,
    string userId, string eventType, string data, string? eventId = null)
{
    if (!clients.TryGetValue(userId, out var responses)) return;
    var sb = new StringBuilder();
    sb.Append($"event: {eventType}\n");
    if (eventId != null) sb.Append($"id: {eventId}\n");
    sb.Append($"data: {data}\n\n");
    var payload = sb.ToString();
    foreach (var res in responses)
    {
        try { await res.WriteAsync(payload); await res.Body.FlushAsync(); }
        catch { /* client disconnected */ }
    }
}

static string? ValidateToken(string token) => null; // implement JWT validation

app.Run();

SSE Client

class SSEClient {
  private eventSource: EventSource | null = null;
  private url: string;
  private token: string;
  private reconnectAttempts = 0;

  constructor(url: string, token: string) {
    this.url = url;
    this.token = token;
  }

  connect() {
    const url = `${this.url}?token=${encodeURIComponent(this.token)}`;
    this.eventSource = new EventSource(url);

    this.eventSource.addEventListener('notification', (event) => {
      const data = JSON.parse(event.data);
      console.log('Notification:', data);
    });

    this.eventSource.addEventListener('message', (event) => {
      const data = JSON.parse(event.data);
      console.log('Message:', data);
    });

    this.eventSource.onerror = (err) => {
      console.error('EventSource error:', err);
      if (this.eventSource?.readyState === EventSource.CLOSED) {
        this.attemptReconnect();
      }
    };

    console.log('SSE connected');
  }

  private attemptReconnect() {
    this.reconnectAttempts++;
    const delay = 1000 * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
    console.log(`Reconnecting in ${delay}ms...`);
    setTimeout(() => this.connect(), delay);
  }

  disconnect() {
    if (this.eventSource) {
      this.eventSource.close();
    }
  }
}

// Usage
const client = new SSEClient('http://localhost:3000/events', 'your-jwt-token');
client.connect();
// Java SSE Client using OkHttp
// Maven: com.squareup.okhttp3:okhttp

import okhttp3.*;
import okio.BufferedSource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SseClient {

    private final OkHttpClient httpClient;
    private final String url;
    private final String token;
    private int reconnectAttempts = 0;
    private final int maxReconnectAttempts = 5;

    public SseClient(String url, String token) {
        this.url = url;
        this.token = token;
        this.httpClient = new OkHttpClient.Builder()
                .readTimeout(0, TimeUnit.MILLISECONDS) // No timeout for streaming
                .build();
    }

    public void connect() {
        Request request = new Request.Builder()
                .url(url + "?token=" + token)
                .build();

        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                System.err.println("SSE connection failed: " + e.getMessage());
                attemptReconnect();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                reconnectAttempts = 0;
                System.out.println("SSE connected");
                try (ResponseBody body = response.body()) {
                    if (body == null) return;
                    BufferedSource source = body.source();
                    String eventType = "";
                    while (!source.exhausted()) {
                        String line = source.readUtf8Line();
                        if (line == null) break;
                        if (line.startsWith("event:")) {
                            eventType = line.substring(6).trim();
                        } else if (line.startsWith("data:")) {
                            String data = line.substring(5).trim();
                            handleEvent(eventType, data);
                            eventType = "";
                        }
                    }
                }
                attemptReconnect();
            }
        });
    }

    private void handleEvent(String eventType, String data) {
        switch (eventType) {
            case "notification" -> System.out.println("Notification: " + data);
            case "message" -> System.out.println("Message: " + data);
            default -> System.out.println("Event [" + eventType + "]: " + data);
        }
    }

    private void attemptReconnect() {
        if (reconnectAttempts < maxReconnectAttempts) {
            long delay = 1000L * (1L << Math.min(reconnectAttempts, 5));
            reconnectAttempts++;
            System.out.println("Reconnecting in " + delay + "ms...");
            try { Thread.sleep(delay); } catch (InterruptedException ignored) {}
            connect();
        }
    }

    public void disconnect() {
        httpClient.dispatcher().cancelAll();
    }
}

// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
client.connect();
# Python SSE Client
# pip install httpx

import asyncio
import json
import math

import httpx


class SSEClient:
    def __init__(self, url: str, token: str):
        self.url = url
        self.token = token
        self.reconnect_attempts = 0

    async def connect(self):
        stream_url = f"{self.url}?token={self.token}"
        print("SSE connected")
        try:
            async with httpx.AsyncClient(timeout=None) as client:
                async with client.stream("GET", stream_url) as response:
                    self.reconnect_attempts = 0
                    event_type = ""
                    async for line in response.aiter_lines():
                        if line.startswith("event:"):
                            event_type = line[6:].strip()
                        elif line.startswith("data:"):
                            data = line[5:].strip()
                            self._handle_event(event_type, data)
                            event_type = ""
        except Exception as e:
            print(f"EventSource error: {e}")
            await self._attempt_reconnect()

    def _handle_event(self, event_type: str, data: str):
        parsed = json.loads(data) if data.startswith("{") else data
        if event_type == "notification":
            print("Notification:", parsed)
        elif event_type == "message":
            print("Message:", parsed)
        else:
            print(f"Event [{event_type}]:", parsed)

    async def _attempt_reconnect(self):
        delay = 1000 * (2 ** min(self.reconnect_attempts, 5)) / 1000
        self.reconnect_attempts += 1
        print(f"Reconnecting in {delay:.0f}s...")
        await asyncio.sleep(delay)
        await self.connect()


# Usage
async def main():
    client = SSEClient("http://localhost:3000/events", "your-jwt-token")
    await client.connect()

asyncio.run(main())
// C# SSE Client (.NET 8)
using System.Net.Http.Headers;
using System.Text.Json;

public class SseClient
{
    private readonly HttpClient _httpClient;
    private readonly string _url;
    private readonly string _token;
    private int _reconnectAttempts = 0;
    private const int MaxReconnectAttempts = 5;

    public SseClient(string url, string token)
    {
        _url = url;
        _token = token;
        _httpClient = new HttpClient { Timeout = System.Threading.Timeout.InfiniteTimeSpan };
    }

    public async Task ConnectAsync(CancellationToken cancellationToken = default)
    {
        var uri = $"{_url}?token={Uri.EscapeDataString(_token)}";
        Console.WriteLine("SSE connected");

        try
        {
            using var response = await _httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
            response.EnsureSuccessStatusCode();
            _reconnectAttempts = 0;

            using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
            using var reader = new StreamReader(stream);

            string eventType = "";
            while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
            {
                var line = await reader.ReadLineAsync(cancellationToken);
                if (line == null) break;

                if (line.StartsWith("event:"))
                    eventType = line[6..].Trim();
                else if (line.StartsWith("data:"))
                {
                    var data = line[5..].Trim();
                    HandleEvent(eventType, data);
                    eventType = "";
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine($"EventSource error: {ex.Message}");
            await AttemptReconnectAsync(cancellationToken);
        }
    }

    private void HandleEvent(string eventType, string data)
    {
        switch (eventType)
        {
            case "notification":
                Console.WriteLine($"Notification: {data}");
                break;
            case "message":
                Console.WriteLine($"Message: {data}");
                break;
            default:
                Console.WriteLine($"Event [{eventType}]: {data}");
                break;
        }
    }

    private async Task AttemptReconnectAsync(CancellationToken cancellationToken)
    {
        if (_reconnectAttempts < MaxReconnectAttempts)
        {
            var delay = 1000 * (int)Math.Pow(2, Math.Min(_reconnectAttempts, 5));
            _reconnectAttempts++;
            Console.WriteLine($"Reconnecting in {delay}ms...");
            await Task.Delay(delay, cancellationToken);
            await ConnectAsync(cancellationToken);
        }
    }
}

// Usage
var client = new SseClient("http://localhost:3000/events", "your-jwt-token");
await client.ConnectAsync();

Authentication for Persistent Connections

Token in Query Parameter

Pros: Works with standard HTTP clients, simple. Cons: Token visible in logs, less secure over unencrypted connections.

const token = url.searchParams.get('token');
const decoded = verifyToken(token);

Pros: Automatic with every request, secure (HttpOnly flag). Cons: Requires CORS preflight for cross-origin.

const token = req.cookies.auth_token;
const decoded = verifyToken(token);

Token in First Message

Pros: Clean separation, supports bidirectional auth. Cons: Window of unauthenticated connection.

ws.on('message', (data) => {
  if (!ws.authenticated) {
    const { token } = JSON.parse(data);
    ws.authenticated = verifyToken(token);
    if (!ws.authenticated) {
      ws.close(4001, 'Authentication failed');
    }
    return;
  }
  // Process authenticated message
});
// Spring WebSocket TextWebSocketHandler
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    if (!Boolean.TRUE.equals(session.getAttributes().get("authenticated"))) {
        JsonNode payload = objectMapper.readTree(message.getPayload());
        String token = payload.get("token").asText();
        boolean valid = jwtService.validateToken(token);
        session.getAttributes().put("authenticated", valid);
        if (!valid) {
            session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Authentication failed"));
        }
        return;
    }
    // Process authenticated message
}
# FastAPI WebSocket: authenticate via first message
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    authenticated = False
    async for data in websocket.iter_text():
        if not authenticated:
            payload = json.loads(data)
            try:
                verify_token(payload.get("token"))
                authenticated = True
            except Exception:
                await websocket.close(code=4001, reason="Authentication failed")
                return
            continue
        # Process authenticated message
        await handle_message(websocket, data)
// ASP.NET Core WebSocket: authenticate via first message
public async Task HandleAsync(WebSocket ws, CancellationToken ct)
{
    bool authenticated = false;
    var buffer = new byte[4096];
    while (ws.State == WebSocketState.Open)
    {
        var result = await ws.ReceiveAsync(buffer, ct);
        var text = Encoding.UTF8.GetString(buffer, 0, result.Count);
        if (!authenticated)
        {
            var payload = JsonSerializer.Deserialize<AuthMessage>(text);
            if (!_jwtService.ValidateToken(payload?.Token))
            {
                await ws.CloseAsync(
                    (WebSocketCloseStatus)4001, "Authentication failed", ct);
                return;
            }
            authenticated = true;
            continue;
        }
        // Process authenticated message
        await HandleMessageAsync(ws, text, ct);
    }
}

Production Recommendation: Use secure cookies (HttpOnly, Secure, SameSite) or embed the token in WebSocket URL with short expiration.


Heartbeat and Ping-Pong

Why Heartbeat?

Long-lived connections may be terminated silently by proxies, firewalls, or load balancers. A heartbeat detects these dead connections before the client/server tries to send.

WebSocket Ping-Pong

WebSocket has built-in PING (0x9) and PONG (0xA) opcodes. The server sends PING, the client responds with PONG.

// Server: Send ping every 30 seconds
setInterval(() => {
  wss.clients.forEach((ws: AuthenticatedWebSocket) => {
    if (ws.isAlive === false) {
      ws.terminate();
      return;
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

// Client receives pong automatically; no handler needed unless custom logic
// Spring WebSocket: schedule server-side ping via SimpMessagingTemplate
@Scheduled(fixedRate = 30000)
public void sendHeartbeat() {
    for (WebSocketSession session : activeSessions.values()) {
        if (!session.isOpen()) {
            activeSessions.remove(session.getId());
            continue;
        }
        try {
            session.sendMessage(new PingMessage());
        } catch (IOException e) {
            log.warn("Ping failed for session {}", session.getId());
            activeSessions.remove(session.getId());
        }
    }
}
// PongMessage is handled automatically by Spring WebSocket
# FastAPI: periodic ping every 30 seconds
import asyncio

async def websocket_with_heartbeat(websocket: WebSocket):
    await websocket.accept()
    async def ping_task():
        while True:
            await asyncio.sleep(30)
            try:
                await websocket.send_text('{"type":"ping"}')
            except Exception:
                break

    task = asyncio.create_task(ping_task())
    try:
        async for message in websocket.iter_text():
            await handle_message(websocket, message)
    finally:
        task.cancel()
// ASP.NET Core: periodic ping every 30 seconds
private async Task HeartbeatAsync(WebSocket ws, CancellationToken ct)
{
    using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
    while (await timer.WaitForNextTickAsync(ct) && ws.State == WebSocketState.Open)
    {
        var ping = Encoding.UTF8.GetBytes("{\"type\":\"ping\"}");
        await ws.SendAsync(ping, WebSocketMessageType.Text, true, ct);
    }
}
// Call Task.WhenAny(ReceiveLoopAsync(...), HeartbeatAsync(...)) to run both

SSE Heartbeat

SSE uses comment frames or custom events:

setInterval(() => {
  sseClients.forEach((clients) => {
    clients.forEach((client) => {
      client.res.write(': heartbeat\n\n');
    });
  });
}, 30000);
// Spring: SSE heartbeat via SseEmitter
@Scheduled(fixedRate = 30000)
public void sendSseHeartbeat() {
    List<SseEmitter> dead = new ArrayList<>();
    for (SseEmitter emitter : sseEmitters) {
        try {
            emitter.send(SseEmitter.event().comment("heartbeat"));
        } catch (IOException e) {
            dead.add(emitter);
        }
    }
    sseEmitters.removeAll(dead);
}
# FastAPI: SSE heartbeat via comment frame
async def sse_with_heartbeat():
    while True:
        yield ": heartbeat\n\n"
        await asyncio.sleep(30)

@app.get("/events")
async def sse_endpoint():
    return StreamingResponse(sse_with_heartbeat(), media_type="text/event-stream")
// ASP.NET Core: SSE heartbeat via comment frame
private async IAsyncEnumerable<string> SseWithHeartbeatAsync(
    [EnumeratorCancellation] CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        yield return ": heartbeat\n\n";
        await Task.Delay(TimeSpan.FromSeconds(30), ct);
    }
}

Reconnection Strategies

Exponential Backoff

Retry with increasing delays to avoid thundering herd:

private attemptReconnect() {
  const maxDelay = 60000; // 60 seconds max
  const delay = Math.min(
    this.reconnectDelay * Math.pow(2, this.reconnectAttempts),
    maxDelay,
  );
  this.reconnectAttempts++;
  
  console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
  setTimeout(() => this.connect(), delay);
}
// Exponential backoff reconnect
private void attemptReconnect() {
    long maxDelay = 60_000L;
    long delay = Math.min(
        reconnectDelay * (long) Math.pow(2, reconnectAttempts),
        maxDelay
    );
    reconnectAttempts++;
    log.info("Reconnecting in {}ms (attempt {})", delay, reconnectAttempts);
    scheduler.schedule(this::connect, delay, TimeUnit.MILLISECONDS);
}
# Exponential backoff reconnect
async def attempt_reconnect(self):
    max_delay = 60.0  # seconds
    delay = min(self.reconnect_delay * (2 ** self.reconnect_attempts), max_delay)
    self.reconnect_attempts += 1
    print(f"Reconnecting in {delay:.1f}s (attempt {self.reconnect_attempts})")
    await asyncio.sleep(delay)
    await self.connect()
// Exponential backoff reconnect
private async Task AttemptReconnectAsync(CancellationToken ct)
{
    const double maxDelayMs = 60_000;
    double delay = Math.Min(
        _reconnectDelay * Math.Pow(2, _reconnectAttempts),
        maxDelayMs
    );
    _reconnectAttempts++;
    _logger.LogInformation("Reconnecting in {Delay}ms (attempt {Attempt})", delay, _reconnectAttempts);
    await Task.Delay(TimeSpan.FromMilliseconds(delay), ct);
    await ConnectAsync(ct);
}

Jitter

Add randomness to prevent synchronized reconnections:

const jitter = Math.random() * 1000;
const delayWithJitter = delay + jitter;
setTimeout(() => this.connect(), delayWithJitter);

State Preservation

Cache messages during disconnection, replay on reconnect:

class RobustSSEClient {
  private messageBuffer: any[] = [];
  private lastEventId: string | null = null;

  private handleMessage(event: MessageEvent) {
    this.messageBuffer.push(JSON.parse(event.data));
    this.lastEventId = event.lastEventId || this.lastEventId;
    // Process message...
  }

  private reconnect() {
    const url = `${this.url}?token=${this.token}`;
    if (this.lastEventId) {
      // Browser EventSource sends Last-Event-ID header automatically
      // Server can use it to resume from checkpoint
    }
    this.eventSource = new EventSource(url);
  }
}
// Spring WebSocket: resume from last event ID
public class RobustSseClient {
    private final List<Object> messageBuffer = new ArrayList<>();
    private String lastEventId = null;

    public void handleMessage(String data, String eventId) {
        messageBuffer.add(parseJson(data));
        if (eventId != null) lastEventId = eventId;
        // Process message...
    }

    public void reconnect(String url, String token) {
        WebClient.Builder builder = WebClient.builder().baseUrl(url);
        if (lastEventId != null) {
            builder.defaultHeader("Last-Event-ID", lastEventId);
        }
        builder.build()
            .get()
            .uri(uriBuilder -> uriBuilder.queryParam("token", token).build())
            .retrieve()
            .bodyToFlux(ServerSentEvent.class)
            .subscribe(event -> handleMessage(
                (String) event.data(), event.id()));
    }
}
# Python: SSE client with Last-Event-ID resume
import httpx

class RobustSseClient:
    def __init__(self, url: str, token: str):
        self.url = url
        self.token = token
        self.message_buffer: list = []
        self.last_event_id: str | None = None

    async def connect(self):
        headers = {}
        if self.last_event_id:
            headers["Last-Event-ID"] = self.last_event_id
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "GET", self.url,
                params={"token": self.token},
                headers=headers,
            ) as response:
                async for line in response.aiter_lines():
                    if line.startswith("id:"):
                        self.last_event_id = line[3:].strip()
                    elif line.startswith("data:"):
                        data = json.loads(line[5:].strip())
                        self.message_buffer.append(data)
                        # Process message...
// ASP.NET Core: SSE client with Last-Event-ID resume
public class RobustSseClient
{
    private readonly HttpClient _http;
    private readonly string _url;
    private readonly string _token;
    private readonly List<object> _messageBuffer = new();
    private string? _lastEventId;

    public async Task ConnectAsync(CancellationToken ct)
    {
        using var request = new HttpRequestMessage(HttpMethod.Get,
            $"{_url}?token={_token}");
        if (_lastEventId is not null)
            request.Headers.TryAddWithoutValidation("Last-Event-ID", _lastEventId);

        using var response = await _http.SendAsync(
            request, HttpCompletionOption.ResponseHeadersRead, ct);
        using var reader = new StreamReader(await response.Content.ReadAsStreamAsync(ct));

        string? line;
        while ((line = await reader.ReadLineAsync()) is not null)
        {
            if (line.StartsWith("id:")) _lastEventId = line[3..].Trim();
            else if (line.StartsWith("data:"))
            {
                var data = JsonSerializer.Deserialize<object>(line[5..].Trim());
                _messageBuffer.Add(data!);
                // Process message...
            }
        }
    }
}

Scaling with Redis Pub/Sub

When deploying across multiple servers, WebSocket connections on Server A can’t directly message connections on Server B. Redis pub/sub acts as a message broker:

import redis from 'redis';

const redisClient = redis.createClient();
const redisSubscriber = redis.createClient();

await redisClient.connect();
await redisSubscriber.connect();

const userConnections = new Map<string, Set<AuthenticatedWebSocket>>();

wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
  const userId = (req as any).userId;
  ws.userId = userId;

  if (!userConnections.has(userId)) {
    userConnections.set(userId, new Set());
  }
  userConnections.get(userId)!.add(ws);

  ws.on('message', async (data) => {
    const message = JSON.parse(data.toString());

    if (message.type === 'chat') {
      // Publish to Redis so other servers can route to this user
      await redisClient.publish(`user:${message.recipientId}:messages`, JSON.stringify({
        from: userId,
        text: message.text,
        timestamp: new Date(),
      }));
    }
  });
});

// Subscribe to user-specific channels
async function subscribeToUser(userId: string) {
  await redisSubscriber.subscribe(`user:${userId}:messages`, (message) => {
    const parsedMessage = JSON.parse(message);
    const connections = userConnections.get(userId);
    
    if (connections) {
      connections.forEach((ws) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify({
            type: 'chat',
            from: parsedMessage.from,
            text: parsedMessage.text,
          }));
        }
      });
    }
  });
}

wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
  const userId = (req as any).userId;
  subscribeToUser(userId);
});
// Spring Boot WebSocket + Redis Pub/Sub
// pom.xml: spring-boot-starter-data-redis, spring-boot-starter-websocket

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class RedisWebSocketBridge {

    private final StringRedisTemplate redisTemplate;
    private final SimpMessagingTemplate wsTemplate;
    private final RedisMessageListenerContainer listenerContainer;
    private final Map<String, MessageListener> listeners = new ConcurrentHashMap<>();

    public RedisWebSocketBridge(
            StringRedisTemplate redisTemplate,
            SimpMessagingTemplate wsTemplate,
            RedisMessageListenerContainer listenerContainer) {
        this.redisTemplate = redisTemplate;
        this.wsTemplate = wsTemplate;
        this.listenerContainer = listenerContainer;
    }

    // Called when a user connects
    public void subscribeToUser(String userId) {
        String channel = "user:" + userId + ":messages";
        MessageListener listener = (Message message, byte[] pattern) -> {
            String payload = new String(message.getBody());
            // Route Redis message to the user's WebSocket queue
            wsTemplate.convertAndSendToUser(userId, "/queue/messages", payload);
        };
        listeners.put(userId, listener);
        listenerContainer.addMessageListener(listener, new PatternTopic(channel));
    }

    // Called when a message should be sent to another user
    public void publishToUser(String recipientId, String fromUserId, String text) {
        String channel = "user:" + recipientId + ":messages";
        String payload = String.format(
            "{\"from\":\"%s\",\"text\":\"%s\",\"timestamp\":\"%s\"}",
            fromUserId, text, java.time.Instant.now()
        );
        redisTemplate.convertAndSend(channel, payload);
    }

    // Called when a user disconnects
    public void unsubscribeFromUser(String userId) {
        MessageListener listener = listeners.remove(userId);
        if (listener != null) {
            listenerContainer.removeMessageListener(listener);
        }
    }
}
# FastAPI WebSocket + Redis Pub/Sub
# pip install fastapi uvicorn redis websockets

import asyncio
import json
from collections import defaultdict
from typing import Dict, Set

import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query

app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")
redis_subscriber = aioredis.from_url("redis://localhost")

user_connections: Dict[str, Set[WebSocket]] = defaultdict(set)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
    user_id = validate_token(token)  # implement JWT validation
    if not user_id:
        await websocket.close(code=4001)
        return

    await websocket.accept()
    user_connections[user_id].add(websocket)

    # Subscribe to this user's Redis channel
    subscribe_task = asyncio.create_task(subscribe_to_user(user_id))

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            if message["type"] == "chat":
                # Publish to Redis — any server can pick this up
                await redis_client.publish(
                    f"user:{message['recipientId']}:messages",
                    json.dumps({
                        "from": user_id,
                        "text": message["text"],
                        "timestamp": asyncio.get_event_loop().time(),
                    }),
                )
    except WebSocketDisconnect:
        user_connections[user_id].discard(websocket)
        subscribe_task.cancel()


async def subscribe_to_user(user_id: str):
    channel = f"user:{user_id}:messages"
    async with redis_subscriber.pubsub() as pubsub:
        await pubsub.subscribe(channel)
        async for raw_message in pubsub.listen():
            if raw_message["type"] != "message":
                continue
            payload = json.loads(raw_message["data"])
            for ws in list(user_connections.get(user_id, [])):
                try:
                    await ws.send_text(json.dumps({
                        "type": "chat",
                        "from": payload["from"],
                        "text": payload["text"],
                    }))
                except Exception:
                    user_connections[user_id].discard(ws)


def validate_token(token: str):
    return None  # implement JWT validation
// ASP.NET Core WebSocket + Redis Pub/Sub (.NET 8)
// NuGet: StackExchange.Redis, Microsoft.AspNetCore.WebSockets

using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using StackExchange.Redis;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379"));

var app = builder.Build();
app.UseWebSockets();

var userConnections = new ConcurrentDictionary<string, ConcurrentBag<WebSocket>>();
var redis = app.Services.GetRequiredService<IConnectionMultiplexer>();
var subscriber = redis.GetSubscriber();

app.Map("/ws", async (HttpContext context) =>
{
    if (!context.WebSockets.IsWebSocketRequest) { context.Response.StatusCode = 400; return; }

    var token = context.Request.Query["token"].ToString();
    var userId = ValidateToken(token);
    if (userId == null) { context.Response.StatusCode = 401; return; }

    var ws = await context.WebSockets.AcceptWebSocketAsync();
    userConnections.GetOrAdd(userId, _ => new ConcurrentBag<WebSocket>()).Add(ws);

    // Subscribe to this user's Redis channel
    await subscriber.SubscribeAsync($"user:{userId}:messages", async (channel, value) =>
    {
        var payload = JsonSerializer.Deserialize<JsonElement>(value!);
        var outgoing = JsonSerializer.Serialize(new
        {
            type = "chat",
            from = payload.GetProperty("from").GetString(),
            text = payload.GetProperty("text").GetString(),
        });
        if (ws.State == WebSocketState.Open)
            await ws.SendAsync(Encoding.UTF8.GetBytes(outgoing),
                WebSocketMessageType.Text, true, CancellationToken.None);
    });

    var buffer = new byte[4096];
    var db = redis.GetDatabase();
    while (ws.State == WebSocketState.Open)
    {
        var result = await ws.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close) break;

        var json = Encoding.UTF8.GetString(buffer, 0, result.Count);
        var message = JsonSerializer.Deserialize<JsonElement>(json);

        if (message.GetProperty("type").GetString() == "chat")
        {
            var recipientId = message.GetProperty("recipientId").GetString()!;
            var pubPayload = JsonSerializer.Serialize(new
            {
                from = userId,
                text = message.GetProperty("text").GetString(),
                timestamp = DateTime.UtcNow,
            });
            // Publish — any server subscribed to this channel will receive it
            await db.PublishAsync($"user:{recipientId}:messages", pubPayload);
        }
    }

    await subscriber.UnsubscribeAsync($"user:{userId}:messages");
});

static string? ValidateToken(string token) => null; // implement JWT validation

app.Run();

Pattern Summary:

  1. Client sends message to Server A.
  2. Server A publishes to Redis channel user:RECIPIENT:messages.
  3. Server B (where recipient is connected) receives the message from Redis.
  4. Server B routes the message to the recipient’s WebSocket connection.

Backpressure and Flow Control

The Problem

If a client sends messages faster than the server processes them, the internal buffer grows, consuming memory. Similarly, if the client can’t receive messages (slow network), the server’s send buffer fills.

WebSocket Backpressure

ws.on('message', async (data) => {
  const message = JSON.parse(data.toString());

  // Process asynchronously; don't block the event loop
  try {
    await processMessage(message);
  } catch (err) {
    console.error('Error processing message:', err);
  }
});

// Check write buffer before sending
function sendMessage(ws: WebSocket, data: any) {
  const serialized = JSON.stringify(data);
  
  if (ws.bufferedAmount > 16 * 1024) {
    console.warn('Write buffer exceeded 16KB; apply backpressure');
    // Option 1: Drop message (acceptable for real-time streams)
    // Option 2: Queue and retry later
    // Option 3: Pause reading from client
    return false;
  }

  ws.send(serialized);
  return true;
}
// Spring WebSocket backpressure with bounded queue
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

@Component
public class BackpressureWebSocketSender {

    private static final int MAX_QUEUE_SIZE = 1000;
    private static final int WARN_THRESHOLD = 800;

    private final SimpMessagingTemplate messagingTemplate;

    public BackpressureWebSocketSender(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    // Per-user bounded queue to absorb bursts and apply backpressure
    private final java.util.concurrent.ConcurrentHashMap<String, BlockingQueue<Object>>
        userQueues = new java.util.concurrent.ConcurrentHashMap<>();

    public boolean sendToUser(String userId, Object message) {
        BlockingQueue<Object> queue = userQueues.computeIfAbsent(
            userId, k -> new ArrayBlockingQueue<>(MAX_QUEUE_SIZE)
        );

        if (queue.size() >= WARN_THRESHOLD) {
            System.out.println("Backpressure warning for user " + userId
                + ": queue=" + queue.size());
        }

        boolean accepted = queue.offer(message); // non-blocking; drops if full
        if (!accepted) {
            System.out.println("Write buffer exceeded for user " + userId + "; dropping message");
            return false;
        }

        // Drain queue and forward to WebSocket
        Object pending;
        while ((pending = queue.poll()) != null) {
            messagingTemplate.convertAndSendToUser(userId, "/queue/messages", pending);
        }
        return true;
    }
}
# FastAPI WebSocket backpressure with asyncio.Queue
import asyncio
import json
from typing import Dict

MAX_QUEUE_SIZE = 1000
WARN_THRESHOLD = 800

send_queues: Dict[str, asyncio.Queue] = {}


async def send_with_backpressure(user_id: str, ws, data: dict) -> bool:
    if user_id not in send_queues:
        send_queues[user_id] = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)

    queue = send_queues[user_id]

    if queue.qsize() >= WARN_THRESHOLD:
        print(f"Backpressure warning for user {user_id}: queue={queue.qsize()}")

    try:
        queue.put_nowait(data)  # Raises QueueFull if at capacity
    except asyncio.QueueFull:
        print(f"Write buffer exceeded 16KB equivalent for user {user_id}; dropping message")
        return False

    # Drain queue
    while not queue.empty():
        item = queue.get_nowait()
        try:
            await ws.send_text(json.dumps(item))
        except Exception:
            return False

    return True


async def process_message(message: dict):
    # Simulate async processing
    await asyncio.sleep(0)
// ASP.NET Core WebSocket backpressure with Channel<T>
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;

public class BackpressureWebSocketSender
{
    private const int MaxQueueSize = 1000;
    private const int WarnThreshold = 800;

    private readonly ConcurrentDictionary<string, Channel<object>> _userChannels = new();

    public bool Enqueue(string userId, object message)
    {
        var channel = _userChannels.GetOrAdd(userId, _ =>
            Channel.CreateBounded<object>(new BoundedChannelOptions(MaxQueueSize)
            {
                FullMode = BoundedChannelFullMode.DropOldest, // backpressure: drop oldest
                SingleWriter = false,
                SingleReader = true,
            }));

        if (channel.Reader.Count >= WarnThreshold)
            Console.WriteLine($"Backpressure warning for user {userId}: queue={channel.Reader.Count}");

        if (!channel.Writer.TryWrite(message))
        {
            Console.WriteLine($"Write buffer exceeded for user {userId}; dropping message");
            return false;
        }

        return true;
    }

    public async Task DrainToWebSocket(string userId, WebSocket ws, CancellationToken ct)
    {
        if (!_userChannels.TryGetValue(userId, out var channel)) return;

        await foreach (var item in channel.Reader.ReadAllAsync(ct))
        {
            if (ws.State != WebSocketState.Open) break;
            var json = JsonSerializer.Serialize(item);
            await ws.SendAsync(Encoding.UTF8.GetBytes(json),
                WebSocketMessageType.Text, true, ct);
        }
    }
}

SSE Backpressure

function broadcastToUser(userId: string, data: any): boolean {
  const clients = sseClients.get(userId);
  if (!clients) return true;

  let allSuccess = true;
  clients.forEach((client) => {
    const canWrite = client.res.write(`data: ${JSON.stringify(data)}\n\n`);
    if (!canWrite) {
      console.warn(`Backpressure detected for user ${userId}`);
      allSuccess = false;
      // Apply backpressure: pause reading, implement queue, etc.
    }
  });

  return allSuccess;
}
// Java SSE backpressure — check emitter completion before sending
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Set;

public boolean broadcastToUser(String userId, String eventType, Object data,
                               java.util.Map<String, Set<SseEmitter>> clients) {
    Set<SseEmitter> emitters = clients.getOrDefault(userId, Set.of());
    boolean allSuccess = true;

    for (SseEmitter emitter : emitters) {
        try {
            emitter.send(SseEmitter.event().name(eventType).data(data));
        } catch (IOException e) {
            System.out.println("Backpressure detected for user " + userId + ": " + e.getMessage());
            allSuccess = false;
            // Remove dead emitter
            emitter.completeWithError(e);
        }
    }

    return allSuccess;
}
# Python SSE backpressure — bounded asyncio.Queue per client
import asyncio
import json
from typing import Dict, Set

MAX_SSE_QUEUE = 100

# Each client gets a bounded queue; if full, backpressure is signaled
sse_queues: Dict[str, Set[asyncio.Queue]] = {}


async def broadcast_to_user(user_id: str, data: dict) -> bool:
    queues = sse_queues.get(user_id, set())
    all_success = True

    for queue in list(queues):
        try:
            queue.put_nowait({"event": "message", "data": json.dumps(data)})
        except asyncio.QueueFull:
            print(f"Backpressure detected for user {user_id}")
            all_success = False
            # Drop oldest item to make room (alternative: drop newest)
            queue.get_nowait()
            queue.put_nowait({"event": "message", "data": json.dumps(data)})

    return all_success
// C# SSE backpressure — Channel<T> per response
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;

public class SseBackpressureSender
{
    private readonly ConcurrentDictionary<string, Channel<string>> _userChannels = new();

    public bool BroadcastToUser(string userId, object data)
    {
        if (!_userChannels.TryGetValue(userId, out var channel)) return true;

        var payload = $"data: {JsonSerializer.Serialize(data)}\n\n";

        if (!channel.Writer.TryWrite(payload))
        {
            Console.WriteLine($"Backpressure detected for user {userId}");
            // Apply backpressure: pause reading, implement queue, etc.
            return false;
        }

        return true;
    }

    public Channel<string> CreateChannelForUser(string userId)
    {
        var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait, // block writer until reader drains
        });
        _userChannels[userId] = channel;
        return channel;
    }

    public void RemoveUser(string userId) => _userChannels.TryRemove(userId, out _);
}

Comparison: WebSocket vs SSE vs Long Polling

FeatureWebSocketSSELong Polling
BidirectionalYesNo (separate HTTP POST)No (separate HTTP POST)
LatencyLow (< 50ms)Low (~100ms)High (1-10s)
Connection OverheadMinimal (2-10 bytes/frame)Minimal (text-based)High (new HTTP headers per request)
Server PushNativeNativeSimulated (client polls)
Browser SupportModern browsersModern browsersAll browsers
Proxy CompatibilityGood (needs WebSocket-aware proxies)Excellent (standard HTTP)Excellent (standard HTTP)
ScalabilityMedium (persistent connections)Medium (persistent connections)Easy (stateless)
Use CaseChat, gaming, real-time dashboardsNotifications, log streamingLegacy systems, strict proxies

Production Checklist

Connection Management

  • Connection Limits: Set per-server and per-user limits to prevent resource exhaustion.

    const MAX_CONNECTIONS_PER_SERVER = 10000;
    const MAX_CONNECTIONS_PER_USER = 5;
    
    if (userConnections.get(userId)?.size >= MAX_CONNECTIONS_PER_USER) {
      ws.close(4029, 'Too many connections');
      return;
    }
  • Timeout Handling: Close idle connections.

    const CONNECTION_IDLE_TIMEOUT = 5 * 60 * 1000; // 5 minutes
    
    ws.on('message', () => {
      clearTimeout(ws.idleTimer);
      ws.idleTimer = setTimeout(() => ws.close(4000, 'Idle timeout'), CONNECTION_IDLE_TIMEOUT);
    });

Load Balancer Configuration

  • Sticky Sessions: Route a user’s requests to the same server to ensure WebSocket upgrade success.
    # Nginx example
    upstream backend {
      least_conn;
      server backend1;
      server backend2;
      server backend3;
    }
    
    server {
      location /ws {
        proxy_pass http://backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 86400; # Long timeout for persistent connections
      }
    }

Memory Management

  • Monitor Memory Usage: Use process.memoryUsage() and set up alerts.

    setInterval(() => {
      const mem = process.memoryUsage();
      console.log(`Memory: ${(mem.heapUsed / 1024 / 1024).toFixed(2)}MB / ${(mem.heapTotal / 1024 / 1024).toFixed(2)}MB`);
      
      if (mem.heapUsed > 1024 * 1024 * 1024) { // 1GB
        console.error('Heap usage critical; closing new connections');
        // Reject new connections, gracefully close old ones
      }
    }, 10000);
  • Message Queue Limits: Prevent unbounded growth of pending messages.

    const MAX_MESSAGE_QUEUE = 1000;
    
    if (messageQueue.length > MAX_MESSAGE_QUEUE) {
      messageQueue.shift(); // Drop oldest
    }

Monitoring and Logging

  • Connection Metrics:

    • Active WebSocket connections
    • Connections by user/server
    • Message throughput (msg/sec)
    • Latency percentiles (p50, p95, p99)
    • Error rates and close codes
  • Structured Logging:

    logger.info('ws:connected', { userId, remoteAddress: req.socket.remoteAddress, timestamp: new Date() });
    logger.error('ws:error', { userId, error: err.message, code: err.code });
    logger.warn('ws:backpressure', { userId, bufferedAmount: ws.bufferedAmount });

Graceful Shutdown

function gracefulShutdown() {
  console.log('Shutting down gracefully...');
  
  // Stop accepting new connections
  server.close();
  
  // Close all WebSocket connections
  wss.clients.forEach((ws) => {
    ws.close(1001, 'Server shutting down');
  });
  
  // Wait for graceful closure
  setTimeout(() => {
    console.log('Force shutting down');
    process.exit(1);
  }, 30000); // 30 seconds
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
// Spring Boot graceful shutdown
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class GracefulWebSocketShutdown {

    private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final SimpMessagingTemplate messagingTemplate;

    public GracefulWebSocketShutdown(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    public void registerSession(String sessionId, WebSocketSession session) {
        sessions.put(sessionId, session);
    }

    @EventListener(ContextClosedEvent.class)
    public void onShutdown() {
        System.out.println("Shutting down gracefully...");

        // Notify all connected clients
        messagingTemplate.convertAndSend("/topic/shutdown",
            java.util.Map.of("reason", "Server shutting down"));

        // Close all sessions
        for (WebSocketSession session : sessions.values()) {
            try {
                session.close(CloseStatus.SERVICE_RESTARTED);
            } catch (IOException e) {
                System.err.println("Error closing session: " + e.getMessage());
            }
        }

        System.out.println("All WebSocket sessions closed");
    }
}
# FastAPI graceful shutdown
import asyncio
import json
import signal
from typing import Dict, Set

from fastapi import FastAPI, WebSocket

app = FastAPI()
active_websockets: Set[WebSocket] = set()


async def graceful_shutdown():
    print("Shutting down gracefully...")

    # Notify all clients
    close_tasks = []
    for ws in list(active_websockets):
        try:
            await ws.send_text(json.dumps({"type": "server_shutdown"}))
            close_tasks.append(ws.close(1001))
        except Exception:
            pass

    if close_tasks:
        await asyncio.gather(*close_tasks, return_exceptions=True)

    print("All WebSocket connections closed")


@app.on_event("shutdown")
async def shutdown_event():
    await graceful_shutdown()


# uvicorn handles SIGTERM/SIGINT and calls the shutdown event
// ASP.NET Core graceful shutdown (.NET 8)
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Hosting;

public class GracefulShutdownService : IHostedService
{
    public static readonly ConcurrentBag<WebSocket> ActiveSockets = new();

    public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        Console.WriteLine("Shutting down gracefully...");

        var shutdownMsg = JsonSerializer.Serialize(new { type = "server_shutdown" });
        var bytes = Encoding.UTF8.GetBytes(shutdownMsg);

        var tasks = ActiveSockets
            .Where(ws => ws.State == WebSocketState.Open)
            .Select(async ws =>
            {
                try
                {
                    await ws.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None);
                    await ws.CloseAsync(WebSocketCloseStatus.EndpointUnavailable,
                        "Server shutting down", CancellationToken.None);
                }
                catch { /* ignore — client may have already disconnected */ }
            });

        // Wait up to 30 seconds for graceful closure
        await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(30));
        Console.WriteLine("All WebSocket connections closed");
    }
}

// Register in Program.cs:
// builder.Services.AddHostedService<GracefulShutdownService>();

Testing

  • Connection Stability: Simulate network conditions (latency, packet loss, disconnects).
  • Load Testing: Test with thousands of concurrent connections.
  • Protocol Compliance: Verify frame structure, opcodes, and close codes.

Conclusion

WebSocket and SSE are powerful tools for real-time communication. WebSocket excels for bidirectional, low-latency scenarios (chat, gaming, collaborative editing). SSE shines for simple server-push use cases (notifications, log streaming) with better proxy compatibility.

Scaling persistent connections requires coordination via message brokers, careful memory management, and production-grade monitoring. With proper architecture—heartbeats, reconnection strategies, load balancing, and graceful shutdown—both protocols support millions of concurrent users.

Choose WebSocket for full bidirectionality and latency-critical apps; choose SSE for simplicity and proxy compatibility. And always measure, monitor, and test under realistic load.

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

Written by Palakorn Voramongkol

Software Engineer Specialist with 20+ years of experience. Writing about architecture, performance, and building production systems.

More about me

Continue Reading