กลับไปที่บทความ
Architecture System Design Backend Infrastructure

System Design Patterns ที่วิศวกรชั้นสูงทุกคนควรรู้

พลากร วรมงคล
25 มีนาคม 2568 12 นาที

“System Design Patterns ที่จำเป็นสำหรับวิศวกรชั้นสูง ครอบคลุม Load Balancing Caching Strategies Database Sharding Message Queues Rate Limiting Circuit Breaker CQRS และ Event Sourcing พร้อมตัวอย่าง Architectural ที่เป็นประโยชน์”

System Design Patterns ที่วิศวกรชั้นสูงทุกคนควรรู้

System Design แยกวิศวกรชั้นสูงจากวิศวกรเบื้องต้น ขณะที่วิศวกรเบื้องต้นเขียนโค้ด วิศวกรชั้นสูง Design Systems หลังจากสองทศวรรษของการ Architect Systems ในทุก Scale—จาก Startups ไปถึง Enterprises ที่จัดการ Billions of Requests—ฉันได้สังเกตว่า Patterns บางตัวซ้ำ ๆ ในระบบที่ประสบความสำเร็จ

Patterns เหล่านี้ไม่ใช่ Theoretical Exercises พวกเขาเป็น Battle-Tested Solutions ให้ปัญหาจริง การเข้าใจลึกซึ้งเปลี่ยนวิธีการ Approach System Design ของคุณ

Load Balancing: Distributing the Load

Load Balancers นั่งอยู่ที่ Front ของ System ของคุณ Distributing Incoming Requests ตรงกับ Multiple Servers Naive Approach—Simple Round-Robin—ทำงานจนกว่า ไม่ได้

Load Balancing Strategies

┌─────────────────────────────────────────────────┐
│              Incoming Traffic                    │
└─────────────────────────────────────────────────┘

           ┌──────────┴──────────┐
           │   Load Balancer     │
           └──────────┬──────────┘

        ┌─────────────┼─────────────┐
        │             │             │
    ┌────────┐  ┌────────┐  ┌────────┐
    │Server 1│  │Server 2│  │Server 3│
    └────────┘  └────────┘  └────────┘

Round-Robin Distributes evenly แต่ Ignores Server Load Least Connections ติดตาม Active Connections บน Each Server Weighted Round-Robin กำหนด Load Percentages ต่างกันตาม Server Capacity

// Load balancer implementation with health checks
class LoadBalancer {
  private servers: Server[] = [];
  private currentIndex = 0;
  private healthCheckInterval: NodeJS.Timer;

  constructor(serverAddresses: string[]) {
    this.servers = serverAddresses.map(
      addr => new Server(addr)
    );
    this.startHealthChecks();
  }

  private startHealthChecks(): void {
    this.healthCheckInterval = setInterval(() => {
      this.servers.forEach(server => {
        this.checkServerHealth(server);
      });
    }, 5000); // Check every 5 seconds
  }

  private async checkServerHealth(server: Server): Promise<void> {
    try {
      const response = await fetch(`http://${server.address}/health`, {
        timeout: 2000,
      });
      server.healthy = response.ok;
    } catch {
      server.healthy = false;
    }
  }

  // Least connections algorithm
  selectServer(): Server {
    const healthyServers = this.servers.filter(s => s.healthy);
    
    if (healthyServers.length === 0) {
      throw new Error('No healthy servers available');
    }

    return healthyServers.reduce((prev, current) =>
      prev.activeConnections < current.activeConnections ? prev : current
    );
  }

  async handleRequest(request: any): Promise<any> {
    const server = this.selectServer();
    server.activeConnections++;

    try {
      const response = await fetch(`http://${server.address}${request.path}`, {
        method: request.method,
        headers: request.headers,
        body: request.body,
      });
      return response;
    } finally {
      server.activeConnections--;
    }
  }
}

class Server {
  address: string;
  healthy: boolean = true;
  activeConnections: number = 0;

  constructor(address: string) {
    this.address = address;
  }
}
import org.springframework.stereotype.Component;
import java.net.URI;
import java.net.http.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class LoadBalancer {

    private final List<Server> servers = new CopyOnWriteArrayList<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final HttpClient httpClient = HttpClient.newBuilder()
        .connectTimeout(Duration.ofMillis(2000))
        .build();

    public LoadBalancer(List<String> serverAddresses) {
        serverAddresses.forEach(addr -> servers.add(new Server(addr)));
        startHealthChecks();
    }

    private void startHealthChecks() {
        scheduler.scheduleAtFixedRate(() ->
            servers.forEach(this::checkServerHealth), 0, 5, TimeUnit.SECONDS);
    }

    private void checkServerHealth(Server server) {
        try {
            var request = HttpRequest.newBuilder()
                .uri(URI.create("http://" + server.address + "/health"))
                .timeout(Duration.ofMillis(2000))
                .GET().build();
            var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());
            server.healthy = response.statusCode() == 200;
        } catch (Exception e) {
            server.healthy = false;
        }
    }

    // Least connections algorithm
    public Server selectServer() {
        return servers.stream()
            .filter(s -> s.healthy)
            .min(Comparator.comparingInt(s -> s.activeConnections.get()))
            .orElseThrow(() -> new IllegalStateException("No healthy servers available"));
    }

    public HttpResponse<String> handleRequest(String path, String method) throws Exception {
        Server server = selectServer();
        server.activeConnections.incrementAndGet();
        try {
            var request = HttpRequest.newBuilder()
                .uri(URI.create("http://" + server.address + path))
                .method(method, HttpRequest.BodyPublishers.noBody())
                .build();
            return httpClient.send(request, HttpResponse.BodyHandlers.ofString());
        } finally {
            server.activeConnections.decrementAndGet();
        }
    }

    public static class Server {
        final String address;
        volatile boolean healthy = true;
        final AtomicInteger activeConnections = new AtomicInteger(0);

        Server(String address) { this.address = address; }
    }
}
import asyncio
import aiohttp
from dataclasses import dataclass, field

@dataclass
class Server:
    address: str
    healthy: bool = True
    active_connections: int = 0


class LoadBalancer:
    def __init__(self, server_addresses: list[str]):
        self.servers = [Server(addr) for addr in server_addresses]
        self._health_task: asyncio.Task | None = None

    async def start(self):
        self._health_task = asyncio.create_task(self._health_check_loop())

    async def _health_check_loop(self):
        while True:
            await asyncio.gather(
                *[self._check_server_health(s) for s in self.servers],
                return_exceptions=True,
            )
            await asyncio.sleep(5)

    async def _check_server_health(self, server: Server) -> None:
        try:
            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=2)
            ) as session:
                async with session.get(f"http://{server.address}/health") as resp:
                    server.healthy = resp.ok
        except Exception:
            server.healthy = False

    def select_server(self) -> Server:
        healthy = [s for s in self.servers if s.healthy]
        if not healthy:
            raise RuntimeError("No healthy servers available")
        # Least connections
        return min(healthy, key=lambda s: s.active_connections)

    async def handle_request(self, path: str, method: str = "GET") -> aiohttp.ClientResponse:
        server = self.select_server()
        server.active_connections += 1
        try:
            async with aiohttp.ClientSession() as session:
                async with session.request(method, f"http://{server.address}{path}") as resp:
                    return resp
        finally:
            server.active_connections -= 1
using System.Collections.Concurrent;

public class LoadBalancer
{
    private readonly List<Server> _servers;
    private readonly HttpClient _httpClient;
    private readonly Timer _healthCheckTimer;

    public LoadBalancer(IEnumerable<string> serverAddresses)
    {
        _servers = serverAddresses.Select(a => new Server(a)).ToList();
        _httpClient = new HttpClient { Timeout = TimeSpan.FromMilliseconds(2000) };
        _healthCheckTimer = new Timer(_ => CheckAllServers(), null,
            TimeSpan.Zero, TimeSpan.FromSeconds(5));
    }

    private void CheckAllServers()
    {
        foreach (var server in _servers)
            _ = CheckServerHealthAsync(server);
    }

    private async Task CheckServerHealthAsync(Server server)
    {
        try
        {
            var response = await _httpClient.GetAsync($"http://{server.Address}/health");
            server.Healthy = response.IsSuccessStatusCode;
        }
        catch
        {
            server.Healthy = false;
        }
    }

    // Least connections algorithm
    public Server SelectServer()
    {
        var healthy = _servers.Where(s => s.Healthy).ToList();
        if (healthy.Count == 0)
            throw new InvalidOperationException("No healthy servers available");

        return healthy.MinBy(s => s.ActiveConnections)!;
    }

    public async Task<HttpResponseMessage> HandleRequestAsync(
        string path, HttpMethod method)
    {
        var server = SelectServer();
        Interlocked.Increment(ref server.ActiveConnectionsField);
        try
        {
            var request = new HttpRequestMessage(method,
                $"http://{server.Address}{path}");
            return await _httpClient.SendAsync(request);
        }
        finally
        {
            Interlocked.Decrement(ref server.ActiveConnectionsField);
        }
    }
}

public class Server
{
    public string Address { get; }
    public volatile bool Healthy = true;
    public int ActiveConnectionsField;
    public int ActiveConnections => ActiveConnectionsField;

    public Server(string address) => Address = address;
}

Caching: The Secret to Performance

ฉันได้สังเกตว่า Caching แยก Fast Systems จาก Slow Ones Cache อย่างถูกต้อง และคุณลด Backend Load ลง 90% Cache Poorly และคุณได้ Stale Data Nightmares

Multi-Level Caching Architecture

┌──────────────────────────────────────────┐
│           Client Request                  │
└──────────────────────────────────────────┘

┌──────────────────────────────────────────┐
│    L1: Browser Cache (60 seconds)        │
└──────────────────────────────────────────┘

┌──────────────────────────────────────────┐
│  L2: CDN Cache (5 minutes)                │
└──────────────────────────────────────────┘

┌──────────────────────────────────────────┐
│  L3: In-Memory Cache - Redis (1 hour)    │
└──────────────────────────────────────────┘

┌──────────────────────────────────────────┐
│  L4: Database                             │
└──────────────────────────────────────────┘
// Cache-aside pattern with Redis
import redis from 'redis';

const client = redis.createClient({ host: 'localhost', port: 6379 });

async function getUser(userId: string): Promise<User> {
  // Check Redis first (L3 cache)
  const cached = await client.get(`user:${userId}`);
  
  if (cached) {
    return JSON.parse(cached);
  }

  // Cache miss: query database
  const user = await database.query('SELECT * FROM users WHERE id = ?', [userId]);
  
  // Store in cache for 1 hour
  await client.setex(`user:${userId}`, 3600, JSON.stringify(user));
  
  return user;
}

// Cache invalidation on updates
async function updateUser(userId: string, updates: Partial<User>): Promise<User> {
  const user = await database.query('UPDATE users SET ? WHERE id = ?', [
    updates,
    userId,
  ]);

  // Invalidate cache
  await client.del(`user:${userId}`);

  return user;
}

// Batch cache warming
async function warmCache(): Promise<void> {
  const users = await database.query('SELECT * FROM users LIMIT 1000');
  
  for (const user of users) {
    await client.setex(
      `user:${user.id}`,
      3600,
      JSON.stringify(user)
    );
  }
}
import org.springframework.cache.annotation.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.List;

@Service
public class UserCacheService {

    private final StringRedisTemplate redis;
    private final UserRepository userRepository;
    private final ObjectMapper objectMapper;

    public UserCacheService(StringRedisTemplate redis,
                             UserRepository userRepository,
                             ObjectMapper objectMapper) {
        this.redis = redis;
        this.userRepository = userRepository;
        this.objectMapper = objectMapper;
    }

    public User getUser(String userId) throws Exception {
        // Check Redis first (L3 cache)
        String cached = redis.opsForValue().get("user:" + userId);
        if (cached != null) {
            return objectMapper.readValue(cached, User.class);
        }

        // Cache miss: query database
        User user = userRepository.findById(userId)
            .orElseThrow(() -> new RuntimeException("User not found"));

        // Store in cache for 1 hour
        redis.opsForValue().set(
            "user:" + userId,
            objectMapper.writeValueAsString(user),
            Duration.ofHours(1)
        );

        return user;
    }

    public User updateUser(String userId, UserUpdates updates) throws Exception {
        User user = userRepository.update(userId, updates);
        // Invalidate cache
        redis.delete("user:" + userId);
        return user;
    }

    public void warmCache() throws Exception {
        List<User> users = userRepository.findTop1000();
        for (User user : users) {
            redis.opsForValue().set(
                "user:" + user.getId(),
                objectMapper.writeValueAsString(user),
                Duration.ofHours(1)
            );
        }
    }
}
import redis
import json
from dataclasses import dataclass, asdict

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)


@dataclass
class User:
    id: str
    email: str
    name: str


def get_user(user_id: str) -> User:
    # Check Redis first (L3 cache)
    cached = redis_client.get(f"user:{user_id}")
    if cached:
        return User(**json.loads(cached))

    # Cache miss: query database
    user = database_query("SELECT * FROM users WHERE id = %s", [user_id])

    # Store in cache for 1 hour
    redis_client.setex(f"user:{user_id}", 3600, json.dumps(asdict(user)))
    return user


def update_user(user_id: str, updates: dict) -> User:
    user = database_query("UPDATE users SET ... WHERE id = %s", [user_id])
    # Invalidate cache
    redis_client.delete(f"user:{user_id}")
    return user


def warm_cache() -> None:
    users = database_query("SELECT * FROM users LIMIT 1000")
    pipeline = redis_client.pipeline()
    for user in users:
        pipeline.setex(f"user:{user.id}", 3600, json.dumps(asdict(user)))
    pipeline.execute()


def database_query(sql: str, params: list = None):
    # Placeholder for actual DB call
    raise NotImplementedError
using Microsoft.Extensions.Caching.Distributed;
using System.Text.Json;

public class UserCacheService
{
    private readonly IDistributedCache _cache;
    private readonly IUserRepository _userRepository;

    public UserCacheService(IDistributedCache cache, IUserRepository userRepository)
    {
        _cache = cache;
        _userRepository = userRepository;
    }

    public async Task<User> GetUserAsync(string userId)
    {
        // Check Redis first (L3 cache)
        var cached = await _cache.GetStringAsync($"user:{userId}");
        if (cached is not null)
            return JsonSerializer.Deserialize<User>(cached)!;

        // Cache miss: query database
        var user = await _userRepository.FindByIdAsync(userId)
            ?? throw new KeyNotFoundException($"User {userId} not found");

        // Store in cache for 1 hour
        var options = new DistributedCacheEntryOptions
        {
            AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
        };
        await _cache.SetStringAsync(
            $"user:{userId}", JsonSerializer.Serialize(user), options);

        return user;
    }

    public async Task<User> UpdateUserAsync(string userId, UserUpdates updates)
    {
        var user = await _userRepository.UpdateAsync(userId, updates);
        // Invalidate cache
        await _cache.RemoveAsync($"user:{userId}");
        return user;
    }

    public async Task WarmCacheAsync()
    {
        var users = await _userRepository.FindTop1000Async();
        var options = new DistributedCacheEntryOptions
        {
            AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
        };

        var tasks = users.Select(user =>
            _cache.SetStringAsync(
                $"user:{user.Id}", JsonSerializer.Serialize(user), options));

        await Task.WhenAll(tasks);
    }
}

Cache Eviction Policies

Workloads ต่างกันต้องการ Eviction Strategies ต่างกัน:

  • LRU (Least Recently Used): ที่ดีที่สุดสำหรับ Most Use Cases ขจัด Oldest Accessed Items
  • LFU (Least Frequently Used): ดีกว่าสำหรับ Working Sets ขจัด Rarely Accessed Items
  • TTL (Time To Live): Simple Expiration มีประโยชน์สำหรับ Session Data
  • Sliding Window: Extends Expiration บน Each Access ดีสำหรับ Active Users
// Custom cache implementation with TTL
class ExpiringCache<K, V> {
  private cache: Map<K, { value: V; expiresAt: number }> = new Map();

  set(key: K, value: V, ttlSeconds: number): void {
    const expiresAt = Date.now() + ttlSeconds * 1000;
    this.cache.set(key, { value, expiresAt });
  }

  get(key: K): V | null {
    const item = this.cache.get(key);
    
    if (!item) return null;
    
    if (Date.now() > item.expiresAt) {
      this.cache.delete(key);
      return null;
    }

    return item.value;
  }

  // Cleanup expired entries periodically
  cleanup(): void {
    const now = Date.now();
    const expiredKeys: K[] = [];

    for (const [key, item] of this.cache.entries()) {
      if (now > item.expiresAt) {
        expiredKeys.push(key);
      }
    }

    expiredKeys.forEach(key => this.cache.delete(key));
  }
}
import java.util.Map;
import java.util.concurrent.*;

public class ExpiringCache<K, V> {

    private record Entry<V>(V value, long expiresAt) {}

    private final ConcurrentHashMap<K, Entry<V>> cache = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public ExpiringCache() {
        // Cleanup expired entries every 60 seconds
        scheduler.scheduleAtFixedRate(this::cleanup, 60, 60, TimeUnit.SECONDS);
    }

    public void set(K key, V value, long ttlSeconds) {
        long expiresAt = System.currentTimeMillis() + ttlSeconds * 1000L;
        cache.put(key, new Entry<>(value, expiresAt));
    }

    public V get(K key) {
        Entry<V> entry = cache.get(key);
        if (entry == null) return null;

        if (System.currentTimeMillis() > entry.expiresAt()) {
            cache.remove(key);
            return null;
        }

        return entry.value();
    }

    public void cleanup() {
        long now = System.currentTimeMillis();
        cache.entrySet().removeIf(e -> now > e.getValue().expiresAt());
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}
import time
import threading
from typing import Generic, TypeVar

K = TypeVar("K")
V = TypeVar("V")


class ExpiringCache(Generic[K, V]):
    def __init__(self):
        self._cache: dict[K, tuple[V, float]] = {}
        self._lock = threading.Lock()

    def set(self, key: K, value: V, ttl_seconds: int) -> None:
        expires_at = time.time() + ttl_seconds
        with self._lock:
            self._cache[key] = (value, expires_at)

    def get(self, key: K) -> V | None:
        with self._lock:
            entry = self._cache.get(key)
            if entry is None:
                return None
            value, expires_at = entry
            if time.time() > expires_at:
                del self._cache[key]
                return None
            return value

    def cleanup(self) -> None:
        now = time.time()
        with self._lock:
            expired = [k for k, (_, exp) in self._cache.items() if now > exp]
            for key in expired:
                del self._cache[key]
using System.Collections.Concurrent;

public class ExpiringCache<K, V> where K : notnull
{
    private record Entry(V Value, long ExpiresAt);

    private readonly ConcurrentDictionary<K, Entry> _cache = new();
    private readonly Timer _cleanupTimer;

    public ExpiringCache()
    {
        _cleanupTimer = new Timer(_ => Cleanup(), null,
            TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(60));
    }

    public void Set(K key, V value, int ttlSeconds)
    {
        var expiresAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + ttlSeconds * 1000L;
        _cache[key] = new Entry(value, expiresAt);
    }

    public V? Get(K key)
    {
        if (!_cache.TryGetValue(key, out var entry))
            return default;

        if (DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() > entry.ExpiresAt)
        {
            _cache.TryRemove(key, out _);
            return default;
        }

        return entry.Value;
    }

    public void Cleanup()
    {
        var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        foreach (var key in _cache.Keys)
        {
            if (_cache.TryGetValue(key, out var entry) && now > entry.ExpiresAt)
                _cache.TryRemove(key, out _);
        }
    }
}

Database Sharding: Horizontal Scaling

เมื่อ Single Database ถึง Limits ของมัน Sharding Distributes Data ตรงกับ Multiple Database Instances ตาม Shard Key

Sharding Strategies

┌─────────────────────────────────────┐
│     Distributed Request              │
└─────────────────────────────────────┘

      ┌───────┴───────┐
      │  Shard Router  │ (hash function)
      └───────┬────────┘

    ┌─────────┼─────────┐
    │         │         │
┌────────┐ ┌────────┐ ┌────────┐
│Shard 1 │ │Shard 2 │ │Shard 3 │
│(0-33%)  │ │(33-66%)│ │(66-100%)
└────────┘ └────────┘ └────────┘
// Shard routing with consistent hashing
class ShardRouter {
  private shards: Map<string, Database> = new Map();
  private ring: string[] = [];

  constructor(shardAddresses: string[]) {
    shardAddresses.forEach(addr => {
      this.shards.set(addr, new Database(addr));
      // Add multiple points for load distribution
      for (let i = 0; i < 160; i++) { // 160 virtual nodes
        this.ring.push(`${addr}:${i}`);
      }
    });
    this.ring.sort();
  }

  private hashKey(key: string): number {
    let hash = 0;
    for (let i = 0; i < key.length; i++) {
      hash = ((hash << 5) - hash) + key.charCodeAt(i);
      hash |= 0; // Convert to 32-bit integer
    }
    return Math.abs(hash);
  }

  getShard(key: string): Database {
    const hash = this.hashKey(key);
    const hashRing = this.ring.map((node, idx) => ({
      node,
      position: idx,
    }));

    // Find the first shard with hash >= key hash
    const shard = hashRing.find(s => s.position >= hash % this.ring.length);
    const shardAddress = (shard || hashRing[0]).node.split(':')[0];

    return this.shards.get(shardAddress)!;
  }

  async getUserData(userId: string): Promise<any> {
    const shard = this.getShard(userId);
    return shard.query('SELECT * FROM users WHERE id = ?', [userId]);
  }

  async updateUserData(userId: string, data: any): Promise<void> {
    const shard = this.getShard(userId);
    await shard.query('UPDATE users SET ? WHERE id = ?', [data, userId]);
  }
}
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;

public class ShardRouter {

    private final TreeMap<Long, String> ring = new TreeMap<>();
    private final Map<String, DataSource> shards = new HashMap<>();
    private static final int VIRTUAL_NODES = 160;

    public ShardRouter(List<String> shardAddresses) {
        for (String addr : shardAddresses) {
            shards.put(addr, createDataSource(addr));
            for (int i = 0; i < VIRTUAL_NODES; i++) {
                long hash = hashKey(addr + ":" + i);
                ring.put(hash, addr);
            }
        }
    }

    private long hashKey(String key) {
        try {
            MessageDigest md = MessageDigest.getInstance("MD5");
            byte[] bytes = md.digest(key.getBytes(StandardCharsets.UTF_8));
            return ((long)(bytes[3] & 0xFF) << 24)
                 | ((long)(bytes[2] & 0xFF) << 16)
                 | ((long)(bytes[1] & 0xFF) << 8)
                 |  (long)(bytes[0] & 0xFF);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public DataSource getShard(String key) {
        long hash = hashKey(key);
        Map.Entry<Long, String> entry = ring.ceilingEntry(hash);
        if (entry == null) entry = ring.firstEntry();
        return shards.get(entry.getValue());
    }

    public Object getUserData(String userId) throws Exception {
        DataSource shard = getShard(userId);
        try (var conn = shard.getConnection();
             var stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {
            stmt.setString(1, userId);
            return stmt.executeQuery();
        }
    }

    public void updateUserData(String userId, Map<String, Object> data) throws Exception {
        DataSource shard = getShard(userId);
        // Execute update on correct shard
    }

    private DataSource createDataSource(String addr) {
        // Configure and return a DataSource for the shard address
        throw new UnsupportedOperationException("Configure actual DataSource");
    }
}
import hashlib
from dataclasses import dataclass, field


@dataclass
class ShardRouter:
    _ring: dict[int, str] = field(default_factory=dict)
    _shards: dict[str, object] = field(default_factory=dict)
    _sorted_keys: list[int] = field(default_factory=list)

    VIRTUAL_NODES = 160

    def __init__(self, shard_addresses: list[str]):
        self._ring = {}
        self._shards = {}

        for addr in shard_addresses:
            self._shards[addr] = self._create_db_connection(addr)
            for i in range(self.VIRTUAL_NODES):
                hash_val = self._hash_key(f"{addr}:{i}")
                self._ring[hash_val] = addr

        self._sorted_keys = sorted(self._ring.keys())

    def _hash_key(self, key: str) -> int:
        digest = hashlib.md5(key.encode()).digest()
        return int.from_bytes(digest[:4], "little")

    def get_shard(self, key: str):
        h = self._hash_key(key)
        for ring_key in self._sorted_keys:
            if ring_key >= h:
                return self._shards[self._ring[ring_key]]
        return self._shards[self._ring[self._sorted_keys[0]]]

    def get_user_data(self, user_id: str) -> dict:
        shard = self.get_shard(user_id)
        return shard.query("SELECT * FROM users WHERE id = %s", [user_id])

    def update_user_data(self, user_id: str, data: dict) -> None:
        shard = self.get_shard(user_id)
        shard.execute("UPDATE users SET ... WHERE id = %s", [user_id])

    def _create_db_connection(self, addr: str):
        # Return actual DB connection/session factory
        raise NotImplementedError
using System.Security.Cryptography;
using System.Text;

public class ShardRouter
{
    private readonly SortedDictionary<uint, string> _ring = new();
    private readonly Dictionary<string, IDbConnection> _shards = new();
    private const int VirtualNodes = 160;

    public ShardRouter(IEnumerable<string> shardAddresses)
    {
        foreach (var addr in shardAddresses)
        {
            _shards[addr] = CreateConnection(addr);
            for (int i = 0; i < VirtualNodes; i++)
            {
                var hash = HashKey($"{addr}:{i}");
                _ring.TryAdd(hash, addr);
            }
        }
    }

    private static uint HashKey(string key)
    {
        var bytes = MD5.HashData(Encoding.UTF8.GetBytes(key));
        return BitConverter.ToUInt32(bytes, 0);
    }

    public IDbConnection GetShard(string key)
    {
        var hash = HashKey(key);
        var entry = _ring.FirstOrDefault(kv => kv.Key >= hash);
        var addr = entry.Equals(default(KeyValuePair<uint, string>))
            ? _ring.First().Value
            : entry.Value;
        return _shards[addr];
    }

    public async Task<object?> GetUserDataAsync(string userId)
    {
        var shard = GetShard(userId);
        // Execute query on correct shard
        using var cmd = shard.CreateCommand();
        cmd.CommandText = "SELECT * FROM users WHERE id = @id";
        // add parameter and execute...
        return await Task.FromResult<object?>(null);
    }

    public async Task UpdateUserDataAsync(string userId, object data)
    {
        var shard = GetShard(userId);
        // Execute update on correct shard
        await Task.CompletedTask;
    }

    private IDbConnection CreateConnection(string addr)
    {
        // Configure and return actual DB connection
        throw new NotImplementedException("Configure actual DB connection");
    }
}

Message Queues: Decoupling Services

Message Queues Decouple Producers จาก Consumers ช่วยให้ Asynchronous Processing และ Improve System Resilience

┌────────────┐
│  Producer  │
└──────┬─────┘


┌─────────────────┐
│ Message Queue   │ (RabbitMQ, Kafka)
└────────┬────────┘

    ┌────┴────┐
    │          │
    ▼          ▼
┌─────────┐  ┌─────────┐
│Consumer1│  │Consumer2│
└─────────┘  └─────────┘
// Message queue pattern with RabbitMQ
import amqp from 'amqplib';

class MessageQueue {
  private connection: any;
  private channel: any;

  async connect(): Promise<void> {
    this.connection = await amqp.connect('amqp://localhost');
    this.channel = await this.connection.createChannel();
  }

  // Producer: send message
  async publishEvent(eventType: string, payload: any): Promise<void> {
    const exchange = 'events';
    
    await this.channel.assertExchange(exchange, 'topic', { durable: true });
    
    const message = JSON.stringify({
      type: eventType,
      timestamp: new Date(),
      payload,
    });

    this.channel.publish(
      exchange,
      eventType,
      Buffer.from(message),
      { persistent: true }
    );

    console.log(`Published event: ${eventType}`);
  }

  // Consumer: subscribe to events
  async subscribeToEvent(
    eventType: string,
    handler: (message: any) => Promise<void>
  ): Promise<void> {
    const exchange = 'events';
    const queue = `${eventType}_queue`;

    await this.channel.assertExchange(exchange, 'topic', { durable: true });
    await this.channel.assertQueue(queue, { durable: true });
    await this.channel.bindQueue(queue, exchange, eventType);

    // Prefetch one message at a time
    await this.channel.prefetch(1);

    this.channel.consume(queue, async (msg: any) => {
      if (msg) {
        try {
          const content = JSON.parse(msg.content.toString());
          await handler(content);
          this.channel.ack(msg);
        } catch (error) {
          console.error('Error processing message:', error);
          // Nack and requeue
          this.channel.nack(msg, false, true);
        }
      }
    });
  }

  async close(): Promise<void> {
    await this.channel.close();
    await this.connection.close();
  }
}

// Usage
const mq = new MessageQueue();
await mq.connect();

// Producer
await mq.publishEvent('user.created', { userId: 123, email: 'user@example.com' });

// Consumer
await mq.subscribeToEvent('user.created', async (event) => {
  console.log('New user created:', event.payload);
  // Send welcome email, initialize user profile, etc.
});
import com.rabbitmq.client.*;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.*;
import java.util.Map;

@Service
public class MessageQueueService {

    private final RabbitTemplate rabbitTemplate;
    private static final String EXCHANGE = "events";

    public MessageQueueService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    // Producer: publish event
    public void publishEvent(String eventType, Object payload) {
        var message = Map.of(
            "type", eventType,
            "timestamp", java.time.Instant.now().toString(),
            "payload", payload
        );
        rabbitTemplate.convertAndSend(EXCHANGE, eventType, message);
        System.out.println("Published event: " + eventType);
    }
}

// Consumer: subscribe with Spring AMQP annotation
@Component
class UserEventConsumer {

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "user.created_queue", durable = "true"),
        exchange = @Exchange(value = "events", type = ExchangeTypes.TOPIC),
        key = "user.created"
    ))
    public void handleUserCreated(Map<String, Object> event) {
        System.out.println("New user created: " + event.get("payload"));
        // Send welcome email, initialize user profile, etc.
    }
}
import aio_pika
import json
from collections.abc import Callable, Awaitable
from datetime import datetime, timezone


class MessageQueue:
    def __init__(self):
        self.connection = None
        self.channel = None

    async def connect(self) -> None:
        self.connection = await aio_pika.connect_robust("amqp://localhost")
        self.channel = await self.connection.channel()

    async def publish_event(self, event_type: str, payload: dict) -> None:
        exchange = await self.channel.declare_exchange(
            "events", aio_pika.ExchangeType.TOPIC, durable=True
        )
        message_body = json.dumps({
            "type": event_type,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "payload": payload,
        })
        await exchange.publish(
            aio_pika.Message(
                body=message_body.encode(),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            ),
            routing_key=event_type,
        )
        print(f"Published event: {event_type}")

    async def subscribe_to_event(
        self,
        event_type: str,
        handler: Callable[[dict], Awaitable[None]],
    ) -> None:
        exchange = await self.channel.declare_exchange(
            "events", aio_pika.ExchangeType.TOPIC, durable=True
        )
        queue = await self.channel.declare_queue(
            f"{event_type}_queue", durable=True
        )
        await queue.bind(exchange, routing_key=event_type)
        await self.channel.set_qos(prefetch_count=1)

        async def on_message(message: aio_pika.IncomingMessage):
            async with message.process(requeue=True):
                content = json.loads(message.body.decode())
                await handler(content)

        await queue.consume(on_message)

    async def close(self) -> None:
        if self.connection:
            await self.connection.close()


# Usage
async def main():
    mq = MessageQueue()
    await mq.connect()

    await mq.publish_event("user.created", {"userId": 123, "email": "user@example.com"})

    async def handle_user_created(event: dict):
        print("New user created:", event["payload"])

    await mq.subscribe_to_event("user.created", handle_user_created)
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using System.Text.Json;

// Producer service
public class MessageQueueService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public MessageQueueService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task PublishEventAsync(string eventType, object payload)
    {
        var message = new EventMessage
        {
            Type = eventType,
            Timestamp = DateTime.UtcNow,
            Payload = JsonSerializer.Serialize(payload)
        };

        await _publishEndpoint.Publish(message);
        Console.WriteLine($"Published event: {eventType}");
    }
}

public record EventMessage
{
    public string Type { get; init; } = "";
    public DateTime Timestamp { get; init; }
    public string Payload { get; init; } = "";
}

// Consumer: implement IConsumer<T> with MassTransit
public class UserCreatedConsumer : IConsumer<EventMessage>
{
    public async Task Consume(ConsumeContext<EventMessage> context)
    {
        var payload = JsonSerializer.Deserialize<Dictionary<string, object>>(
            context.Message.Payload);
        Console.WriteLine($"New user created: {payload}");
        // Send welcome email, initialize user profile, etc.
        await Task.CompletedTask;
    }
}

// Registration (Program.cs / Startup)
// services.AddMassTransit(x => {
//     x.AddConsumer<UserCreatedConsumer>();
//     x.UsingRabbitMq((ctx, cfg) => {
//         cfg.Host("localhost");
//         cfg.ConfigureEndpoints(ctx);
//     });
// });

Rate Limiting: Protecting Your System

Rate Limiting ป้องกัน Abuse และ Ensures Fair Resource Allocation

// Token bucket algorithm
class TokenBucket {
  private tokens: number;
  private refillRate: number; // tokens per second
  private lastRefillTime: number = Date.now();
  private maxTokens: number;

  constructor(capacity: number, refillRate: number) {
    this.maxTokens = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate;
  }

  private refill(): void {
    const now = Date.now();
    const timePassed = (now - this.lastRefillTime) / 1000;
    const tokensToAdd = timePassed * this.refillRate;

    this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
    this.lastRefillTime = now;
  }

  allowRequest(tokensRequired: number = 1): boolean {
    this.refill();

    if (this.tokens >= tokensRequired) {
      this.tokens -= tokensRequired;
      return true;
    }

    return false;
  }
}

// Per-user rate limiting middleware
const userBuckets = new Map<string, TokenBucket>();

function rateLimitMiddleware(req: any, res: any, next: any) {
  const userId = req.user.id;
  
  // 100 requests per minute
  if (!userBuckets.has(userId)) {
    userBuckets.set(userId, new TokenBucket(100, 100 / 60));
  }

  const bucket = userBuckets.get(userId)!;

  if (!bucket.allowRequest()) {
    return res.status(429).json({
      error: 'Rate limit exceeded',
      retryAfter: 60,
    });
  }

  res.setHeader('X-RateLimit-Remaining', bucket.tokens);
  next();
}
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import jakarta.servlet.*;
import jakarta.servlet.http.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;

@Component
public class RateLimitFilter extends OncePerRequestFilter {

    private final ConcurrentHashMap<String, TokenBucket> userBuckets = new ConcurrentHashMap<>();

    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                     HttpServletResponse response,
                                     FilterChain chain) throws IOException, ServletException {
        String userId = (String) request.getAttribute("userId");
        if (userId == null) { chain.doFilter(request, response); return; }

        TokenBucket bucket = userBuckets.computeIfAbsent(userId,
            k -> new TokenBucket(100, 100.0 / 60.0)); // 100 req/min

        if (!bucket.allowRequest()) {
            response.setStatus(429);
            response.setContentType("application/json");
            response.getWriter().write("{\"error\":\"Rate limit exceeded\",\"retryAfter\":60}");
            return;
        }

        response.setHeader("X-RateLimit-Remaining", String.valueOf((int) bucket.getTokens()));
        chain.doFilter(request, response);
    }

    static class TokenBucket {
        private double tokens;
        private final double refillRate;
        private long lastRefillTime;
        private final double maxTokens;

        TokenBucket(double capacity, double refillRate) {
            this.maxTokens = capacity;
            this.tokens = capacity;
            this.refillRate = refillRate;
            this.lastRefillTime = System.currentTimeMillis();
        }

        private synchronized void refill() {
            long now = System.currentTimeMillis();
            double timePassed = (now - lastRefillTime) / 1000.0;
            tokens = Math.min(maxTokens, tokens + timePassed * refillRate);
            lastRefillTime = now;
        }

        public synchronized boolean allowRequest() {
            refill();
            if (tokens >= 1) { tokens--; return true; }
            return false;
        }

        public synchronized double getTokens() { return tokens; }
    }
}
import time
import threading
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()


class TokenBucket:
    def __init__(self, capacity: float, refill_rate: float):
        self.max_tokens = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.max_tokens, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    def allow_request(self, tokens_required: float = 1.0) -> bool:
        with self._lock:
            self._refill()
            if self.tokens >= tokens_required:
                self.tokens -= tokens_required
                return True
            return False


user_buckets: dict[str, TokenBucket] = {}
_buckets_lock = threading.Lock()


@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    user_id = getattr(request.state, "user_id", None)
    if user_id is None:
        return await call_next(request)

    with _buckets_lock:
        if user_id not in user_buckets:
            user_buckets[user_id] = TokenBucket(100, 100 / 60)  # 100 req/min

    bucket = user_buckets[user_id]

    if not bucket.allow_request():
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded", "retryAfter": 60},
        )

    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(int(bucket.tokens))
    return response
using Microsoft.AspNetCore.Http;
using System.Collections.Concurrent;
using System.Text.Json;

public class RateLimitMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ConcurrentDictionary<string, TokenBucket> _userBuckets = new();

    public RateLimitMiddleware(RequestDelegate next) => _next = next;

    public async Task InvokeAsync(HttpContext context)
    {
        var userId = context.User?.Identity?.Name;
        if (userId is null) { await _next(context); return; }

        var bucket = _userBuckets.GetOrAdd(userId,
            _ => new TokenBucket(100, 100.0 / 60.0)); // 100 req/min

        if (!bucket.AllowRequest())
        {
            context.Response.StatusCode = 429;
            context.Response.ContentType = "application/json";
            await context.Response.WriteAsync(
                JsonSerializer.Serialize(new { error = "Rate limit exceeded", retryAfter = 60 }));
            return;
        }

        context.Response.Headers["X-RateLimit-Remaining"] = ((int)bucket.Tokens).ToString();
        await _next(context);
    }
}

public class TokenBucket
{
    private double _tokens;
    private readonly double _refillRate;
    private long _lastRefillMs;
    private readonly double _maxTokens;
    private readonly object _lock = new();

    public TokenBucket(double capacity, double refillRate)
    {
        _maxTokens = capacity;
        _tokens = capacity;
        _refillRate = refillRate;
        _lastRefillMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    }

    private void Refill()
    {
        var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var elapsed = (now - _lastRefillMs) / 1000.0;
        _tokens = Math.Min(_maxTokens, _tokens + elapsed * _refillRate);
        _lastRefillMs = now;
    }

    public bool AllowRequest(double tokensRequired = 1)
    {
        lock (_lock)
        {
            Refill();
            if (_tokens >= tokensRequired) { _tokens -= tokensRequired; return true; }
            return false;
        }
    }

    public double Tokens { get { lock (_lock) { Refill(); return _tokens; } } }
}

Circuit Breaker: Handling Failures Gracefully

Circuit Breaker ป้องกัน Cascading Failures ด้วยการ Temporarily Stopping Requests ไปยัง Failing Services

CLOSED (normal)  →→→  OPEN (fail fast)  →→→  HALF_OPEN (testing)  →→→  CLOSED
      │                       │                        │
      │                       └──── timeout ──────────┘

      └───────────── success ──────────────────────────┘
enum CircuitState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN',
}

class CircuitBreaker {
  private state: CircuitState = CircuitState.CLOSED;
  private failureCount: number = 0;
  private successCount: number = 0;
  private lastFailureTime: number = 0;
  private readonly failureThreshold: number = 5;
  private readonly successThreshold: number = 2;
  private readonly timeout: number = 60000; // 60 seconds

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = CircuitState.HALF_OPEN;
        this.successCount = 0;
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess(): void {
    this.failureCount = 0;

    if (this.state === CircuitState.HALF_OPEN) {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = CircuitState.CLOSED;
      }
    }
  }

  private onFailure(): void {
    this.lastFailureTime = Date.now();
    this.failureCount++;

    if (this.failureCount >= this.failureThreshold) {
      this.state = CircuitState.OPEN;
    }
  }

  getState(): CircuitState {
    return this.state;
  }
}

// Usage
const breaker = new CircuitBreaker();

async function callExternalAPI(): Promise<any> {
  return breaker.execute(() =>
    fetch('https://external-api.com/data').then(r => r.json())
  );
}
import io.github.resilience4j.circuitbreaker.*;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.net.http.*;
import java.time.Duration;
import java.util.concurrent.Callable;

@Service
public class ExternalAPIService {

    private final CircuitBreaker circuitBreaker;
    private final HttpClient httpClient = HttpClient.newHttpClient();

    public ExternalAPIService() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(60))
            .slidingWindowSize(10)
            .permittedNumberOfCallsInHalfOpenState(2)
            .build();

        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
        this.circuitBreaker = registry.circuitBreaker("externalApi");
    }

    public String callExternalAPI() throws Exception {
        Callable<String> decoratedCall = CircuitBreaker.decorateCallable(
            circuitBreaker,
            () -> {
                var request = HttpRequest.newBuilder()
                    .uri(URI.create("https://external-api.com/data"))
                    .GET().build();
                return httpClient.send(request,
                    HttpResponse.BodyHandlers.ofString()).body();
            }
        );

        try {
            return decoratedCall.call();
        } catch (CallNotPermittedException e) {
            throw new RuntimeException("Circuit breaker is OPEN", e);
        }
    }

    public CircuitBreaker.State getState() {
        return circuitBreaker.getState();
    }
}
from enum import Enum
from time import time
from typing import Callable, TypeVar

T = TypeVar("T")


class CircuitState(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"


class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 2,
        timeout: float = 60.0,
    ):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0.0
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout

    async def execute(self, fn: Callable) -> T:
        if self.state == CircuitState.OPEN:
            if time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise RuntimeError("Circuit breaker is OPEN")

        try:
            result = await fn()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.last_failure_time = time()
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN


# Usage
import aiohttp

breaker = CircuitBreaker()


async def call_external_api() -> dict:
    async def _fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get("https://external-api.com/data") as r:
                return await r.json()

    return await breaker.execute(_fetch)
using Polly;
using Polly.CircuitBreaker;

public class ExternalAPIService
{
    private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
    private readonly HttpClient _httpClient;

    public ExternalAPIService(HttpClient httpClient)
    {
        _httpClient = httpClient;

        _circuitBreaker = Policy
            .Handle<HttpRequestException>()
            .Or<TaskCanceledException>()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromSeconds(60),
                onBreak: (ex, duration) =>
                    Console.WriteLine($"Circuit OPEN for {duration.TotalSeconds}s: {ex.Message}"),
                onReset: () => Console.WriteLine("Circuit CLOSED"),
                onHalfOpen: () => Console.WriteLine("Circuit HALF_OPEN")
            );
    }

    public async Task<string> CallExternalAPIAsync()
    {
        try
        {
            return await _circuitBreaker.ExecuteAsync(async () =>
            {
                var response = await _httpClient.GetAsync("https://external-api.com/data");
                response.EnsureSuccessStatusCode();
                return await response.Content.ReadAsStringAsync();
            });
        }
        catch (BrokenCircuitException ex)
        {
            throw new InvalidOperationException("Circuit breaker is OPEN", ex);
        }
    }

    public CircuitState GetState() => _circuitBreaker.CircuitState;
}

CQRS: Separating Reads from Writes

Command Query Responsibility Segregation แยก Read และ Write Operations ช่วยให้ Independent Optimization

┌─────────────────────────────┐
│     User Request            │
└──────────┬──────────────────┘

    ┌──────┴──────┐
    │             │
    ▼ (Write)     ▼ (Read)
┌────────┐    ┌──────────┐
│Command │    │  Query   │
│Handler │    │  Handler │
└────┬───┘    └──────┬───┘
     │              │
     ▼              ▼
┌──────────┐   ┌──────────┐
│ Event    │   │Read Model│
│Sourcing  │   │(Optimized)
└──────────┘   └──────────┘
// CQRS pattern implementation
interface Command {
  type: string;
  payload: any;
}

interface Query {
  type: string;
  payload: any;
}

// Write side: handle commands
class CommandHandler {
  async handle(command: Command): Promise<void> {
    switch (command.type) {
      case 'CreateUser':
        await this.createUser(command.payload);
        break;
      case 'UpdateUser':
        await this.updateUser(command.payload);
        break;
    }
  }

  private async createUser(data: any): Promise<void> {
    // Write to authoritative source
    const user = await database.query(
      'INSERT INTO users (email, name) VALUES (?, ?)',
      [data.email, data.name]
    );

    // Emit event for read model sync
    await eventBus.publish('UserCreated', { userId: user.id, ...data });
  }

  private async updateUser(data: any): Promise<void> {
    await database.query('UPDATE users SET ? WHERE id = ?', [
      data,
      data.userId,
    ]);
    await eventBus.publish('UserUpdated', data);
  }
}

// Read side: handle queries (optimized for reads)
class QueryHandler {
  constructor(private readModelDb: any) {}

  async handle(query: Query): Promise<any> {
    switch (query.type) {
      case 'GetUserProfile':
        return this.readModelDb.query(
          'SELECT * FROM user_profiles WHERE id = ?',
          [query.payload.userId]
        );
      case 'ListUsers':
        return this.readModelDb.query(
          'SELECT * FROM user_profiles LIMIT ? OFFSET ?',
          [query.payload.limit, query.payload.offset]
        );
    }
  }
}

// Event synchronization: update read model from write model events
async function syncReadModel(event: any): Promise<void> {
  if (event.type === 'UserCreated') {
    await readModelDb.query(
      'INSERT INTO user_profiles (id, email, name) VALUES (?, ?, ?)',
      [event.userId, event.email, event.name]
    );
  } else if (event.type === 'UserUpdated') {
    await readModelDb.query('UPDATE user_profiles SET ? WHERE id = ?', [
      event,
      event.userId,
    ]);
  }
}
import org.springframework.stereotype.*;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.context.ApplicationEventPublisher;
import java.util.List;

// Command (write side)
public record CreateUserCommand(String email, String name) {}
public record UpdateUserCommand(String userId, String email, String name) {}

// Query (read side)
public record GetUserProfileQuery(String userId) {}
public record ListUsersQuery(int limit, int offset) {}

@Service
public class UserCommandHandler {

    private final UserRepository writeRepo;
    private final ApplicationEventPublisher eventPublisher;

    public UserCommandHandler(UserRepository writeRepo,
                               ApplicationEventPublisher eventPublisher) {
        this.writeRepo = writeRepo;
        this.eventPublisher = eventPublisher;
    }

    public void handle(CreateUserCommand cmd) {
        User user = new User(cmd.email(), cmd.name());
        writeRepo.save(user);
        eventPublisher.publishEvent(new UserCreatedEvent(user.getId(), cmd.email(), cmd.name()));
    }

    public void handle(UpdateUserCommand cmd) {
        User user = writeRepo.findById(cmd.userId())
            .orElseThrow(() -> new RuntimeException("User not found"));
        user.setEmail(cmd.email());
        user.setName(cmd.name());
        writeRepo.save(user);
        eventPublisher.publishEvent(new UserUpdatedEvent(cmd.userId(), cmd.email(), cmd.name()));
    }
}

@Service
public class UserQueryHandler {

    private final UserProfileRepository readRepo;

    public UserQueryHandler(UserProfileRepository readRepo) {
        this.readRepo = readRepo;
    }

    public UserProfile handle(GetUserProfileQuery query) {
        return readRepo.findById(query.userId())
            .orElseThrow(() -> new RuntimeException("Profile not found"));
    }

    public List<UserProfile> handle(ListUsersQuery query) {
        return readRepo.findAllWithPagination(query.limit(), query.offset());
    }
}

// Event listener to sync read model
@Component
class ReadModelSynchronizer {

    private final UserProfileRepository profileRepo;

    ReadModelSynchronizer(UserProfileRepository profileRepo) {
        this.profileRepo = profileRepo;
    }

    @org.springframework.context.event.EventListener
    public void onUserCreated(UserCreatedEvent event) {
        profileRepo.save(new UserProfile(event.userId(), event.email(), event.name()));
    }

    @org.springframework.context.event.EventListener
    public void onUserUpdated(UserUpdatedEvent event) {
        profileRepo.findById(event.userId()).ifPresent(p -> {
            p.setEmail(event.email());
            p.setName(event.name());
            profileRepo.save(p);
        });
    }
}
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Any

# Commands and Queries
@dataclass
class CreateUserCommand:
    email: str
    name: str

@dataclass
class UpdateUserCommand:
    user_id: str
    email: str
    name: str

@dataclass
class GetUserProfileQuery:
    user_id: str

@dataclass
class ListUsersQuery:
    limit: int
    offset: int


# Write side: Command Handler
class UserCommandHandler:
    def __init__(self, write_db, event_bus):
        self.write_db = write_db
        self.event_bus = event_bus

    async def handle_create(self, cmd: CreateUserCommand) -> None:
        user = await self.write_db.execute(
            "INSERT INTO users (email, name) VALUES (%s, %s) RETURNING id",
            [cmd.email, cmd.name],
        )
        await self.event_bus.publish("UserCreated", {
            "userId": user["id"], "email": cmd.email, "name": cmd.name
        })

    async def handle_update(self, cmd: UpdateUserCommand) -> None:
        await self.write_db.execute(
            "UPDATE users SET email=%s, name=%s WHERE id=%s",
            [cmd.email, cmd.name, cmd.user_id],
        )
        await self.event_bus.publish("UserUpdated", {
            "userId": cmd.user_id, "email": cmd.email, "name": cmd.name
        })


# Read side: Query Handler (optimized for reads)
class UserQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle_get_profile(self, query: GetUserProfileQuery) -> dict:
        return await self.read_db.fetch_one(
            "SELECT * FROM user_profiles WHERE id = %s", [query.user_id]
        )

    async def handle_list_users(self, query: ListUsersQuery) -> list[dict]:
        return await self.read_db.fetch_all(
            "SELECT * FROM user_profiles LIMIT %s OFFSET %s",
            [query.limit, query.offset],
        )


# Event synchronization
async def sync_read_model(event: dict, read_db) -> None:
    if event["type"] == "UserCreated":
        await read_db.execute(
            "INSERT INTO user_profiles (id, email, name) VALUES (%s, %s, %s)",
            [event["userId"], event["email"], event["name"]],
        )
    elif event["type"] == "UserUpdated":
        await read_db.execute(
            "UPDATE user_profiles SET email=%s, name=%s WHERE id=%s",
            [event["email"], event["name"], event["userId"]],
        )
using MediatR;
using Microsoft.EntityFrameworkCore;

// Commands
public record CreateUserCommand(string Email, string Name) : IRequest;
public record UpdateUserCommand(string UserId, string Email, string Name) : IRequest;

// Queries
public record GetUserProfileQuery(string UserId) : IRequest<UserProfile?>;
public record ListUsersQuery(int Limit, int Offset) : IRequest<List<UserProfile>>;

// Write side: Command Handlers
public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand>
{
    private readonly WriteDbContext _writeDb;
    private readonly IPublisher _eventBus;

    public CreateUserCommandHandler(WriteDbContext writeDb, IPublisher eventBus)
    { _writeDb = writeDb; _eventBus = eventBus; }

    public async Task Handle(CreateUserCommand cmd, CancellationToken ct)
    {
        var user = new User { Email = cmd.Email, Name = cmd.Name };
        _writeDb.Users.Add(user);
        await _writeDb.SaveChangesAsync(ct);
        await _eventBus.Publish(new UserCreatedEvent(user.Id, cmd.Email, cmd.Name), ct);
    }
}

public class UpdateUserCommandHandler : IRequestHandler<UpdateUserCommand>
{
    private readonly WriteDbContext _writeDb;
    private readonly IPublisher _eventBus;

    public UpdateUserCommandHandler(WriteDbContext writeDb, IPublisher eventBus)
    { _writeDb = writeDb; _eventBus = eventBus; }

    public async Task Handle(UpdateUserCommand cmd, CancellationToken ct)
    {
        var user = await _writeDb.Users.FindAsync(new object[] { cmd.UserId }, ct)
            ?? throw new KeyNotFoundException();
        user.Email = cmd.Email; user.Name = cmd.Name;
        await _writeDb.SaveChangesAsync(ct);
        await _eventBus.Publish(new UserUpdatedEvent(cmd.UserId, cmd.Email, cmd.Name), ct);
    }
}

// Read side: Query Handlers (optimized for reads)
public class GetUserProfileQueryHandler : IRequestHandler<GetUserProfileQuery, UserProfile?>
{
    private readonly ReadDbContext _readDb;
    public GetUserProfileQueryHandler(ReadDbContext readDb) => _readDb = readDb;

    public async Task<UserProfile?> Handle(GetUserProfileQuery query, CancellationToken ct) =>
        await _readDb.UserProfiles.FindAsync(new object[] { query.UserId }, ct);
}

// Event handler to sync read model
public class ReadModelSynchronizer :
    INotificationHandler<UserCreatedEvent>,
    INotificationHandler<UserUpdatedEvent>
{
    private readonly ReadDbContext _readDb;
    public ReadModelSynchronizer(ReadDbContext readDb) => _readDb = readDb;

    public async Task Handle(UserCreatedEvent evt, CancellationToken ct)
    {
        _readDb.UserProfiles.Add(new UserProfile
            { Id = evt.UserId, Email = evt.Email, Name = evt.Name });
        await _readDb.SaveChangesAsync(ct);
    }

    public async Task Handle(UserUpdatedEvent evt, CancellationToken ct)
    {
        var profile = await _readDb.UserProfiles.FindAsync(new object[] { evt.UserId }, ct);
        if (profile is null) return;
        profile.Email = evt.Email; profile.Name = evt.Name;
        await _readDb.SaveChangesAsync(ct);
    }
}

Event Sourcing: Immutable History

Event Sourcing เก็บ Every Change เป็น Immutable Event ช่วยให้ Complete Audit Trails และ Time Travel Debugging

// Event sourcing implementation
interface Event {
  id: string;
  aggregateId: string;
  type: string;
  timestamp: Date;
  data: any;
  version: number;
}

class EventStore {
  private events: Event[] = [];
  private eventId = 0;

  async append(aggregateId: string, eventType: string, data: any): Promise<void> {
    const event: Event = {
      id: `event_${++this.eventId}`,
      aggregateId,
      type: eventType,
      timestamp: new Date(),
      data,
      version: await this.getVersion(aggregateId) + 1,
    };

    // Store immutably
    this.events.push(event);
    
    // Publish event for subscribers
    await this.publishEvent(event);
  }

  async getEvents(aggregateId: string): Promise<Event[]> {
    return this.events.filter(e => e.aggregateId === aggregateId);
  }

  async getVersion(aggregateId: string): Promise<number> {
    const events = await this.getEvents(aggregateId);
    return events.length > 0 ? events[events.length - 1].version : 0;
  }

  async getAggregate(aggregateId: string): Promise<any> {
    const events = await this.getEvents(aggregateId);
    let state = {};

    for (const event of events) {
      state = this.applyEvent(state, event);
    }

    return state;
  }

  private applyEvent(state: any, event: Event): any {
    switch (event.type) {
      case 'UserCreated':
        return { ...state, id: event.aggregateId, ...event.data };
      case 'UserProfileUpdated':
        return { ...state, ...event.data };
      case 'UserDeleted':
        return { ...state, deleted: true };
      default:
        return state;
    }
  }

  private async publishEvent(event: Event): Promise<void> {
    // Publish to message queue for consumers
    await eventBus.publish(event.type, event);
  }
}

// Usage
const eventStore = new EventStore();

// Record events
await eventStore.append('user_123', 'UserCreated', {
  email: 'user@example.com',
  name: 'John Doe',
});

await eventStore.append('user_123', 'UserProfileUpdated', {
  bio: 'Software engineer',
});

// Reconstruct state at any point in time
const currentState = await eventStore.getAggregate('user_123');
console.log(currentState);
import org.springframework.stereotype.Repository;
import org.springframework.data.jpa.repository.JpaRepository;
import jakarta.persistence.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

@Entity
@Table(name = "domain_events")
public class DomainEvent {
    @Id private String id;
    private String aggregateId;
    private String type;
    private Instant timestamp;
    @Lob private String data;
    private long version;
    // getters/setters omitted for brevity
}

@Repository
public class EventStore {

    private final DomainEventRepository repository;
    private final ApplicationEventPublisher publisher;
    private final AtomicLong eventIdCounter = new AtomicLong(0);

    public EventStore(DomainEventRepository repository,
                      ApplicationEventPublisher publisher) {
        this.repository = repository;
        this.publisher = publisher;
    }

    public void append(String aggregateId, String eventType, String dataJson) {
        long version = getVersion(aggregateId) + 1;

        DomainEvent event = new DomainEvent();
        event.setId("event_" + eventIdCounter.incrementAndGet());
        event.setAggregateId(aggregateId);
        event.setType(eventType);
        event.setTimestamp(Instant.now());
        event.setData(dataJson);
        event.setVersion(version);

        repository.save(event);
        publisher.publishEvent(event); // fanout to consumers
    }

    public List<DomainEvent> getEvents(String aggregateId) {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId);
    }

    public long getVersion(String aggregateId) {
        return getEvents(aggregateId).stream()
            .mapToLong(DomainEvent::getVersion).max().orElse(0L);
    }

    public Map<String, Object> getAggregate(String aggregateId) throws Exception {
        var events = getEvents(aggregateId);
        Map<String, Object> state = new HashMap<>();
        var mapper = new com.fasterxml.jackson.databind.ObjectMapper();

        for (var event : events) {
            Map<String, Object> data = mapper.readValue(event.getData(), Map.class);
            switch (event.getType()) {
                case "UserCreated" -> { state.put("id", aggregateId); state.putAll(data); }
                case "UserProfileUpdated" -> state.putAll(data);
                case "UserDeleted" -> state.put("deleted", true);
            }
        }
        return state;
    }
}
import json
import uuid
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any


@dataclass
class DomainEvent:
    id: str
    aggregate_id: str
    type: str
    timestamp: str
    data: dict
    version: int


class EventStore:
    def __init__(self, db, event_bus):
        self._db = db
        self._event_bus = event_bus

    async def append(self, aggregate_id: str, event_type: str, data: dict) -> None:
        version = await self.get_version(aggregate_id) + 1
        event = DomainEvent(
            id=str(uuid.uuid4()),
            aggregate_id=aggregate_id,
            type=event_type,
            timestamp=datetime.now(timezone.utc).isoformat(),
            data=data,
            version=version,
        )
        await self._db.execute(
            "INSERT INTO domain_events (id, aggregate_id, type, timestamp, data, version) "
            "VALUES (%s, %s, %s, %s, %s, %s)",
            [event.id, event.aggregate_id, event.type,
             event.timestamp, json.dumps(event.data), event.version],
        )
        await self._event_bus.publish(event.type, asdict(event))

    async def get_events(self, aggregate_id: str) -> list[DomainEvent]:
        rows = await self._db.fetch_all(
            "SELECT * FROM domain_events WHERE aggregate_id = %s ORDER BY version",
            [aggregate_id],
        )
        return [DomainEvent(**row) for row in rows]

    async def get_version(self, aggregate_id: str) -> int:
        events = await self.get_events(aggregate_id)
        return events[-1].version if events else 0

    async def get_aggregate(self, aggregate_id: str) -> dict:
        events = await self.get_events(aggregate_id)
        state: dict[str, Any] = {}

        for event in events:
            match event.type:
                case "UserCreated":
                    state = {"id": aggregate_id, **event.data}
                case "UserProfileUpdated":
                    state.update(event.data)
                case "UserDeleted":
                    state["deleted"] = True

        return state


# Usage (async context)
# event_store = EventStore(db, event_bus)
# await event_store.append("user_123", "UserCreated", {"email": "user@example.com", "name": "John Doe"})
# await event_store.append("user_123", "UserProfileUpdated", {"bio": "Software engineer"})
# state = await event_store.get_aggregate("user_123")
# print(state)
using Microsoft.EntityFrameworkCore;
using System.Text.Json;

public class DomainEventEntity
{
    public string Id { get; set; } = "";
    public string AggregateId { get; set; } = "";
    public string Type { get; set; } = "";
    public DateTime Timestamp { get; set; }
    public string Data { get; set; } = "";
    public long Version { get; set; }
}

public class EventStore
{
    private readonly EventStoreDbContext _db;
    private readonly IPublisher _eventBus;

    public EventStore(EventStoreDbContext db, IPublisher eventBus)
    { _db = db; _eventBus = eventBus; }

    public async Task AppendAsync(string aggregateId, string eventType, object data, CancellationToken ct = default)
    {
        var version = await GetVersionAsync(aggregateId, ct) + 1;

        var @event = new DomainEventEntity
        {
            Id = Guid.NewGuid().ToString(),
            AggregateId = aggregateId,
            Type = eventType,
            Timestamp = DateTime.UtcNow,
            Data = JsonSerializer.Serialize(data),
            Version = version
        };

        _db.Events.Add(@event);
        await _db.SaveChangesAsync(ct);
        await _eventBus.Publish(new DomainEventNotification(@event), ct);
    }

    public async Task<List<DomainEventEntity>> GetEventsAsync(string aggregateId, CancellationToken ct = default) =>
        await _db.Events
            .Where(e => e.AggregateId == aggregateId)
            .OrderBy(e => e.Version)
            .ToListAsync(ct);

    public async Task<long> GetVersionAsync(string aggregateId, CancellationToken ct = default)
    {
        var events = await GetEventsAsync(aggregateId, ct);
        return events.Count > 0 ? events[^1].Version : 0L;
    }

    public async Task<Dictionary<string, object?>> GetAggregateAsync(
        string aggregateId, CancellationToken ct = default)
    {
        var events = await GetEventsAsync(aggregateId, ct);
        var state = new Dictionary<string, object?>();

        foreach (var @event in events)
        {
            var data = JsonSerializer.Deserialize<Dictionary<string, object?>>(@event.Data) ?? new();
            switch (@event.Type)
            {
                case "UserCreated":
                    state["id"] = aggregateId;
                    foreach (var kv in data) state[kv.Key] = kv.Value;
                    break;
                case "UserProfileUpdated":
                    foreach (var kv in data) state[kv.Key] = kv.Value;
                    break;
                case "UserDeleted":
                    state["deleted"] = true;
                    break;
            }
        }

        return state;
    }
}

// Usage:
// await eventStore.AppendAsync("user_123", "UserCreated", new { email = "user@example.com", name = "John Doe" });
// await eventStore.AppendAsync("user_123", "UserProfileUpdated", new { bio = "Software engineer" });
// var state = await eventStore.GetAggregateAsync("user_123");
// Console.WriteLine(JsonSerializer.Serialize(state));

Conclusion

Patterns เหล่านี้ไม่ใช่ One-Size-Fits-All Solutions พวกเขาเป็น Tools สำหรับปัญหาเฉพาะ Art ของ System Design อยู่ใน Recognizing Patterns ที่ Solve Patterns ใดและเมื่อ Apply พวกเขา

Master Patterns เหล่านี้ และคุณจะ Design Systems ที่ Scale Remain Resilient ภายใต้ Failure และ Evolve Gracefully เมื่อ Requirements เปลี่ยน นั่นคือสิ่งที่แยก Senior Engineers จากส่วนที่เหลือ

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

เขียนโดย พลากร วรมงคล

Software Engineer Specialist ประสบการณ์กว่า 20 ปี เขียนเกี่ยวกับ Architecture, Performance และการสร้างระบบ Production

เพิ่มเติมเกี่ยวกับผม

บทความที่เกี่ยวข้อง