กลับไปที่บทความ
Architecture Backend Kafka Distributed Systems

Event Streaming กับ Kafka เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์

พลากร วรมงคล
18 เมษายน 2568 15 นาที

“คำแนะนำที่ครอบคลุมเกี่ยวกับ event streaming กับ Apache Kafka — ครอบคลุมหัวข้อ partitions consumer groups exactly-once semantics Schema Registry Kafka Streams และรูปแบบการปรับใช้งานในสภาวะการผลิต”

บทวิเคราะห์เชิงลึก: Event Streaming กับ Kafka

Event streaming ได้กลายเป็นกระดูกสันหลังของระบบที่กระจายแล้วในสมัยใหม่ Apache Kafka โดดเด่นเป็นมาตรฐานของอุตสาหกรรมสำหรับการสร้างไปป์ไลน์ข้อมูลแบบเรียลไทม์และการใช้งาน streaming ที่มีขนาดใหญ่ คำแนะนำนี้สำรวจรากฐานของสถาปัตยกรรม รูปแบบการนำไปใช้ และการปฏิบัติในสภาวะการผลิตที่ทำให้ Kafka เป็นสิ่งที่ขาดไม่ได้สำหรับสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์

Event Streaming คืออะไร

Event streaming คือการปฏิบัติของการจับข้อมูลในการเคลื่อนที่ การประมวลผล และการตอบสนองต่อข้อมูล — การปฏิบัติต่อเหตุการณ์ว่าเป็นแหล่งความจริงในระบบของคุณ ซึ่งแตกต่างจากคิวข้อความแบบดั้งเดิมที่กินและทิ้งข้อความ ระบบ event streaming รักษาบันทึกที่ไม่เปลี่ยนแปลงและเพิ่มเติมเท่านั้นของเหตุการณ์ที่แอปพลิเคชันสามารถประมวลผลอย่างอิสระและเล่นซ้ำได้ตลอดเวลา

โมเดล Append-Only Log

ที่เป็นหัวใจหลัก Kafka เป็นบันทึก commit แบบกระจาย partitioned และ replicated ลองนึกถึงมันว่าเป็นโครงสร้างข้อมูลแบบเพิ่มเติมเท่านั้นที่เหตุการณ์ถูกเขียนตามลำดับและไม่เคยถูกแก้ไข การออกแบบพื้นฐานนี้เปิดใจให้กับความสามารถที่ทรงพลังหลายประการ:

  • ความสามารถในการเล่นซ้ำ: ย้อนกลับและประมวลผลเหตุการณ์ทางประวัติศาสตร์อีกครั้งจากจุดใดๆ ในเวลา
  • ตราประวัติ: บันทึกที่สมบูรณ์ของการเปลี่ยนแปลงสถานะเพื่อการปฏิบัติตามและการแก้จุดบกพร่อง
  • Decoupling: ผู้ผลิตและผู้บริโภคทำงานอย่างอิสระโดยไม่ต้องประสานงาน
  • ความสามารถในการปรับขนาด: ผู้บริโภคหลายคนประมวลผลเหตุการณ์เดียวกันโดยไม่มีความขัดแย้ง

Topics, Partitions, และ Consumer Groups

topic ใน Kafka คือการป้อนข้อมูลที่ตั้งชื่อของเหตุการณ์ที่จัดเป็นลำดับที่เรียงลำดับ Topics ถูก partitioned ไปยัง brokers หลายตัวเพื่อ parallelism — แต่ละ partition คือบันทึกแบบเพิ่มเติมเท่านั้นอิสระ เหตุการณ์ได้รับการกำหนด partitions โดยใช้กลยุทธ์การแบ่ง (โดยทั่วไปแฮชของกุญแจ) เพื่อรับประกันการสั่งซื้อภายใน partition

consumer groups เป็นชุดของผู้บริโภคที่ทำงานร่วมกันเพื่อประมวลผล topic Kafka กำหนด partitions ให้ผู้บริโภคภายในกลุ่ม ซึ่งมั่นใจว่า partition แต่ละตัวถูกอ่านโดยผู้บริโภคคนเดียวในกลุ่ม consumer groups ที่แตกต่างกันสามารถประมวลผล topic เดียวกันได้อย่างอิสระที่ offsets ที่แตกต่างกัน

การจัดการ Offset และการเล่นซ้ำ

ผู้บริโภคแต่ละคนรักษาไว้ offset — ตำแหน่งใน partition’s log ที่แสดงถึงข้อความถัดไปที่จะอ่าน โดยการจัดเก็บ offsets ผู้บริโภคสามารถทำงานต่อจากที่พวกเขาหยุดและประมวลผลเหตุการณ์อีกครั้งตามความจำเป็น สิ่งนี้ช่วยให้สามารถเล่นซ้ำจากจุดเฉพาะสำหรับการวิเคราะห์ backfilling บริการใหม่ หรือการกู้คืนจากความล้มเหลวในการประมวลผล

ความแตกต่างจาก Traditional Message Queues

Traditional message queues (RabbitMQ, ActiveMQ) ลบข้อความหลังจากการส่ง Kafka เก็บข้อความสำหรับระยะเวลาที่กำหนดได้หรือขีดจำกัดขนาด ซึ่งช่วยให้ผู้บริโภคหลายคนประมวลผลสตรีมข้อความเดียวกันได้อย่างอิสระด้วยอัตราที่แตกต่างกัน โมเดลการเก็บรักษานี้เปลี่ยนวิธีที่คุณสร้างระบบโดยพื้นฐาน — เปิดใจให้กับรูปแบบใหม่เช่น event sourcing และ CQRS

sequenceDiagram
    participant Producer
    participant Kafka Topic/Partition
    participant Consumer Group A
    participant Consumer Group B
    
    Producer->>Kafka Topic/Partition: Publish Event 1
    Producer->>Kafka Topic/Partition: Publish Event 2
    Producer->>Kafka Topic/Partition: Publish Event 3
    
    Kafka Topic/Partition->>Consumer Group A: Read at offset 0
    Consumer Group A->>Consumer Group A: Process Event 1
    
    Kafka Topic/Partition->>Consumer Group B: Read at offset 0
    Consumer Group B->>Consumer Group B: Process Event 1
    
    Kafka Topic/Partition->>Consumer Group A: Read at offset 1
    Consumer Group A->>Consumer Group A: Process Event 2
    
    Kafka Topic/Partition->>Consumer Group B: Read at offset 1
    Consumer Group B->>Consumer Group B: Process Event 2
    
    Note over Consumer Group A,Consumer Group B: Both groups independently<br/>process the same events

หลักการหลัก

  • Immutability: เหตุการณ์ถูกเขียนครั้งเดียวและไม่เคยถูกแก้ไข โดยสร้างบันทึกที่เชื่อถือได้
  • Ordering: ภายใน partition เหตุการณ์รักษาการสั่งซื้อที่เข้มงวด การสั่งซื้อโดยรวมต้องใช้ partition เดียว
  • Durability: replication ที่กำหนดได้มั่นใจความทนต่อความล้มเหลวและความคงอยู่ของข้อมูล
  • Scalability: การแบ่ง partitions ช่วยให้สามารถปรับขนาดแบบเส้นตรงของเสิร์จและประมวลผลความสามารถ
  • Decoupling: ผู้ผลิตและผู้บริโภคเป็นอิสระ ความล้มเหลวในตัวเดียวไม่ส่งผลกระทบโดยตรงต่ออีกตัวหนึ่ง
  • Retention: ข้อความยังคงอยู่สำหรับระยะเวลาการเก็บรักษา ซึ่งเปิดใจให้กับการเล่นซ้ำและการ onboarding ผู้บริโภคใหม่
  • Replay: ย้อนกลับ consumer offset เพื่อประมวลผลเหตุการณ์อีกครั้ง เปิดใจให้กับการแก้ไขและการวิเคราะห์

Kafka Architecture

Brokers และคลัสเตอร์

คลัสเตอร์ Kafka ประกอบด้วย brokers (เซิร์ฟเวอร์) หลายตัวที่ทำงานร่วมกันเพื่อจัดเก็บและให้บริการเหตุการณ์ แต่ละ broker มี partition ส่วนหนึ่งและให้บริการคำขอจากผู้ผลิตและผู้บริโภค broker cluster จะเลือก controller broker โดยอัตโนมัติซึ่งประสานงานงานการบริหารจัดการเช่น partition assignment และ leader election

Topics และ Partitions

topic คือการจัดกลุ่มเหตุการณ์ทางตรรมชาติที่มีชื่อและการกำหนดค่า (replication factor, retention policy, compression) Topics ถูกแบ่งออกเป็น partitions หลายตัว — แต่ละ partition ถูก replicated ไปยัง brokers หลายตัวเพื่อความทนต่อความล้มเหลว replication factor (โดยทั่วไป 3) หมายความว่า partition แต่ละตัวมี leader replica และ follower replicas

Replication และ In-Sync Replicas (ISR)

ISR (In-Sync Replicas) set มี leader และ followers ทั้งหมดที่เพิ่มทันท่วงกับ leader เมื่อคุณเขียนไป partition ด้วย acks=all broker รอการยอมรับจาก ISRs ทั้งหมด ซึ่งรับประกัน durability ถ้า broker ล้มเหลว ISR follower ตัวหนึ่งกลายเป็น leader ใหม่โดยอัตโนมัติ

กลยุทธ์การ replicate นี้ให้:

  • ความทนต่อความล้มเหลว: การประมาณความล้มเหลวของ broker โดยไม่สูญเสียข้อมูล
  • ความพร้อมใช้งาน: การล้มเหลวแบบอัตโนมัติเมื่อ brokers ลง
  • ความคงอยู่ของข้อมูลปรับแต่ง: การแลกเปลี่ยนความล่าช้าสำหรับความสอดคล้องกับการกำหนดค่า acks

ZooKeeper/KRaft Mode

Kafka สมัยใหม่ (3.3+) ใช้โหมด KRaft (Kafka Raft) ซึ่งกำจัด ZooKeeper dependency ภายนอก KRaft รวมข้อมูล cluster metadata เข้าใน Kafka เอง ซึ่งลดความซับซ้อนของการดำเนินการและปรับปรุงความเชื่อถือได้

Producer Implementation กับ KafkaJS

Producer ส่งเหตุการณ์ไปยัง Kafka topics นี่คือการนำไปใช้ที่มีคุณภาพผลิตในภาษา TypeScript:

import { Kafka, logLevel, CompressionTypes } from 'kafkajs';

interface ProducerConfig {
  brokers: string[];
  clientId: string;
  idempotent?: boolean;
  maxInFlightRequests?: number;
  compression?: CompressionTypes;
}

class KafkaProducer {
  private kafka: Kafka;
  private producer: any;

  constructor(config: ProducerConfig) {
    this.kafka = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
      logLevel: logLevel.INFO,
      ssl: true,
      sasl: {
        mechanism: 'plain',
        username: process.env.KAFKA_USER || '',
        password: process.env.KAFKA_PASSWORD || '',
      },
    });

    this.producer = this.kafka.producer({
      idempotent: config.idempotent ?? true,
      maxInFlightRequests: config.maxInFlightRequests ?? 5,
      compression: config.compression ?? CompressionTypes.Snappy,
      // Tuning for high throughput
      requestTimeout: 30000,
      retry: {
        initialRetryTime: 100,
        retries: 8,
        randomizationFactor: 0.2,
      },
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
  }

  // Single event with key-based partitioning
  async sendEvent(topic: string, key: string, value: any): Promise<void> {
    await this.producer.send({
      topic,
      messages: [
        {
          key,
          value: JSON.stringify(value),
          timestamp: Date.now().toString(),
        },
      ],
    });
  }

  // Batch send for higher throughput
  async sendBatch(topic: string, events: Array<{ key: string; value: any }>): Promise<void> {
    await this.producer.send({
      topic,
      messages: events.map(({ key, value }) => ({
        key,
        value: JSON.stringify(value),
        timestamp: Date.now().toString(),
      })),
      // Batch settings for throughput
      timeout: 30000,
      compression: CompressionTypes.Snappy,
    });
  }

  // Transactional writes (exactly-once)
  async sendTransactional(
    topic: string,
    events: Array<{ key: string; value: any }>
  ): Promise<void> {
    const transaction = await this.producer.transaction();
    try {
      await transaction.send({
        topic,
        messages: events.map(({ key, value }) => ({
          key,
          value: JSON.stringify(value),
        })),
      });
      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }
}

// Usage example
const producer = new KafkaProducer({
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  clientId: 'order-service',
  idempotent: true,
  compression: CompressionTypes.Snappy,
});

await producer.connect();

// Send order event
await producer.sendEvent('orders', 'order-123', {
  orderId: 'order-123',
  customerId: 'cust-456',
  amount: 99.99,
  timestamp: new Date().toISOString(),
});

await producer.disconnect();
// pom.xml dependency: org.springframework.kafka:spring-kafka
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.SendResult;

import java.util.*;
import java.util.concurrent.CompletableFuture;

public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // Single event with key-based partitioning
    public CompletableFuture<SendResult<String, String>> sendEvent(
            String topic, String key, Object value) throws Exception {
        String json = objectMapper.writeValueAsString(value);
        return kafkaTemplate.send(topic, key, json);
    }

    // Batch send for higher throughput
    public void sendBatch(String topic, List<Map<String, Object>> events) throws Exception {
        for (Map<String, Object> event : events) {
            String key = (String) event.get("key");
            String json = objectMapper.writeValueAsString(event.get("value"));
            kafkaTemplate.send(topic, key, json);
        }
        kafkaTemplate.flush();
    }

    // Transactional writes (exactly-once)
    public void sendTransactional(String topic, List<Map<String, Object>> events) throws Exception {
        kafkaTemplate.executeInTransaction(kt -> {
            for (Map<String, Object> event : events) {
                try {
                    String key = (String) event.get("key");
                    String json = objectMapper.writeValueAsString(event.get("value"));
                    kt.send(topic, key, json);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return true;
        });
    }
}

// Spring Boot configuration
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        config.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        config.put(ProducerConfig.RETRIES_CONFIG, 8);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
# pip install confluent-kafka aiokafka
import asyncio
import json
import os
from dataclasses import dataclass
from typing import Any
from aiokafka import AIOKafkaProducer
from aiokafka.helpers import create_ssl_context

@dataclass
class ProducerConfig:
    brokers: list[str]
    client_id: str
    idempotent: bool = True
    compression: str = "snappy"

class KafkaProducerService:
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.producer: AIOKafkaProducer | None = None

    async def connect(self) -> None:
        ssl_context = create_ssl_context()
        self.producer = AIOKafkaProducer(
            bootstrap_servers=",".join(self.config.brokers),
            client_id=self.config.client_id,
            enable_idempotence=self.config.idempotent,
            compression_type=self.config.compression,
            max_in_flight_requests_per_connection=5,
            request_timeout_ms=30000,
            retry_backoff_ms=100,
            ssl_context=ssl_context,
            sasl_mechanism="PLAIN",
            sasl_plain_username=os.getenv("KAFKA_USER", ""),
            sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
        )
        await self.producer.start()

    async def disconnect(self) -> None:
        if self.producer:
            await self.producer.stop()

    # Single event with key-based partitioning
    async def send_event(self, topic: str, key: str, value: Any) -> None:
        await self.producer.send(
            topic,
            key=key.encode(),
            value=json.dumps(value).encode(),
            timestamp_ms=int(asyncio.get_event_loop().time() * 1000),
        )

    # Batch send for higher throughput
    async def send_batch(self, topic: str, events: list[dict]) -> None:
        batch = self.producer.create_batch()
        for event in events:
            batch.append(
                key=event["key"].encode(),
                value=json.dumps(event["value"]).encode(),
                timestamp=None,
            )
        partitions = await self.producer.partitions_for(topic)
        partition = next(iter(partitions))
        await self.producer.send_batch(batch, topic, partition=partition)

    # Transactional writes (exactly-once)
    async def send_transactional(self, topic: str, events: list[dict]) -> None:
        async with self.producer.transaction():
            for event in events:
                await self.producer.send(
                    topic,
                    key=event["key"].encode(),
                    value=json.dumps(event["value"]).encode(),
                )


# Usage example
async def main():
    producer = KafkaProducerService(ProducerConfig(
        brokers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
        client_id="order-service",
        idempotent=True,
        compression="snappy",
    ))
    await producer.connect()
    await producer.send_event("orders", "order-123", {
        "orderId": "order-123",
        "customerId": "cust-456",
        "amount": 99.99,
        "timestamp": "2024-01-01T00:00:00Z",
    })
    await producer.disconnect()
// NuGet: Confluent.Kafka
using Confluent.Kafka;
using System.Text.Json;

public class ProducerConfig
{
    public string[] Brokers { get; set; } = [];
    public string ClientId { get; set; } = "";
    public bool Idempotent { get; set; } = true;
    public CompressionType Compression { get; set; } = CompressionType.Snappy;
}

public class KafkaProducerService : IDisposable
{
    private readonly IProducer<string, string> _producer;

    public KafkaProducerService(ProducerConfig config)
    {
        var producerConfig = new Confluent.Kafka.ProducerConfig
        {
            BootstrapServers = string.Join(",", config.Brokers),
            ClientId = config.ClientId,
            EnableIdempotence = config.Idempotent,
            CompressionType = config.Compression,
            MaxInFlight = 5,
            MessageTimeoutMs = 30000,
            Acks = Acks.All,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
            SaslPassword = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
            SecurityProtocol = SecurityProtocol.SaslSsl,
        };
        _producer = new ProducerBuilder<string, string>(producerConfig).Build();
    }

    // Single event with key-based partitioning
    public async Task SendEventAsync(string topic, string key, object value)
    {
        var json = JsonSerializer.Serialize(value);
        await _producer.ProduceAsync(topic, new Message<string, string>
        {
            Key = key,
            Value = json,
            Timestamp = new Timestamp(DateTime.UtcNow),
        });
    }

    // Batch send for higher throughput
    public async Task SendBatchAsync(string topic, IEnumerable<(string Key, object Value)> events)
    {
        foreach (var (key, value) in events)
        {
            var json = JsonSerializer.Serialize(value);
            _producer.Produce(topic, new Message<string, string> { Key = key, Value = json });
        }
        _producer.Flush(TimeSpan.FromSeconds(30));
        await Task.CompletedTask;
    }

    // Transactional writes (exactly-once)
    public async Task SendTransactionalAsync(string topic, IEnumerable<(string Key, object Value)> events)
    {
        _producer.InitTransactions(TimeSpan.FromSeconds(30));
        _producer.BeginTransaction();
        try
        {
            foreach (var (key, value) in events)
            {
                var json = JsonSerializer.Serialize(value);
                _producer.Produce(topic, new Message<string, string> { Key = key, Value = json });
            }
            _producer.CommitTransaction();
        }
        catch
        {
            _producer.AbortTransaction();
            throw;
        }
        await Task.CompletedTask;
    }

    public void Dispose() => _producer.Dispose();
}

กลยุทธ์ Partitioning

partition key กำหนดว่า partition ใดได้รับเหตุการณ์:

// Strategy 1: Hash-based (default) — key is hashed to partition
// Same key always goes to same partition (ordered within key)
await producer.sendEvent('orders', 'customer-123', orderData);

// Strategy 2: Round-robin (null key)
// Distributes events evenly across partitions, but no ordering
await producer.sendEvent('events', null, eventData);

// Strategy 3: Custom strategy — use business domain
// Use customer ID to co-locate related events
const customerId = order.customerId;
await producer.sendEvent('orders', customerId, orderData);
// Strategy 1: Hash-based — same key always goes to same partition
producerService.sendEvent("orders", "customer-123", orderData).get();

// Strategy 2: Round-robin (null key)
producerService.sendEvent("events", null, eventData).get();

// Strategy 3: Custom strategy — use business domain
String customerId = order.getCustomerId();
producerService.sendEvent("orders", customerId, orderData).get();
# Strategy 1: Hash-based — same key always goes to same partition
await producer.send_event("orders", "customer-123", order_data)

# Strategy 2: Round-robin (null key) — distribute evenly
await producer.producer.send("events", value=json.dumps(event_data).encode())

# Strategy 3: Custom strategy — use business domain
customer_id = order["customerId"]
await producer.send_event("orders", customer_id, order_data)
// Strategy 1: Hash-based — same key always goes to same partition
await producerService.SendEventAsync("orders", "customer-123", orderData);

// Strategy 2: Round-robin (null key)
await producerService.SendEventAsync("events", null!, eventData);

// Strategy 3: Custom strategy — use business domain
var customerId = order.CustomerId;
await producerService.SendEventAsync("orders", customerId, orderData);

พิจารณาการออกแบบหลัก:

  • Hot partitions: หลีกเลี่ยง keys ที่เข้มข้นการจราจร (การจราจรทั้งหมดไปยัง partition เดียว)
  • ความต้องการการสั่งซื้อ: ใช้ keys เพื่อรับประกันการสั่งซื้อของเหตุการณ์ที่เกี่ยวข้อง
  • Distribution: มั่นใจว่า keys กระจายอย่างเท่าเทียมกันไปยัง partitions เพื่อการ load balancing

Batching และ Compression

การจัดกลุ่มเหตุการณ์หลายตัวก่อนส่งเพิ่มเสิร์จอย่างมีนัยสำคัญ:

// Batch events together
const events: Array<{ key: string; value: any }> = [];

for (let i = 0; i < 1000; i++) {
  events.push({
    key: `customer-${i % 100}`,
    value: { eventId: i, timestamp: Date.now() },
  });
}

// Send batch with compression
await producer.sendBatch('events', events);
List<Map<String, Object>> events = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    events.add(Map.of(
        "key", "customer-" + (i % 100),
        "value", Map.of("eventId", i, "timestamp", System.currentTimeMillis())
    ));
}
// Send batch with compression
producerService.sendBatch("events", events);
events = [
    {
        "key": f"customer-{i % 100}",
        "value": {"eventId": i, "timestamp": int(asyncio.get_event_loop().time() * 1000)},
    }
    for i in range(1000)
]
# Send batch with compression
await producer.send_batch("events", events)
var events = Enumerable.Range(0, 1000).Select(i => (
    Key: $"customer-{i % 100}",
    Value: (object)new { eventId = i, timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
));
// Send batch with compression
await producerService.SendBatchAsync("events", events);

Compression ลดความกว้างของแบนด์วิดท์เครือข่ายและการจัดเก็บ:

  • Snappy: ความสมดุลที่ดีของความเร็วและอัตราส่วนความอัด (แนะนำสำหรับส่วนใหญ่)
  • LZ4: เร็วที่สุด อัตราส่วนความอัดต่ำกว่า
  • Gzip: ความอัดที่ดีที่สุด ช้ากว่า
  • Zstd: สมัยใหม่ อัตราส่วนและความเร็วที่ยอดเยี่ยม (Kafka 2.1+)

Consumer Implementation

Consumers อ่านเหตุการณ์จาก topics อย่างอิสระ นี่คือ consumer ของผลิตภาพ:

interface ConsumerConfig {
  brokers: string[];
  groupId: string;
  clientId: string;
  topics: string[];
}

class KafkaConsumer {
  private kafka: Kafka;
  private consumer: any;
  private groupId: string;

  constructor(config: ConsumerConfig) {
    this.kafka = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
      logLevel: logLevel.INFO,
      ssl: true,
      sasl: {
        mechanism: 'plain',
        username: process.env.KAFKA_USER || '',
        password: process.env.KAFKA_PASSWORD || '',
      },
    });

    this.groupId = config.groupId;
    this.consumer = this.kafka.consumer({
      groupId: config.groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576, // 1MB
      // Offset commit strategy
      allowAutoTopicCreation: false,
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
  }

  async subscribe(topics: string[]): Promise<void> {
    await this.consumer.subscribe({ topics, fromBeginning: false });
  }

  async run(
    onMessage: (topic: string, partition: number, message: any) => Promise<void>
  ): Promise<void> {
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const value = JSON.parse(message.value?.toString() || '{}');
          await onMessage(topic, partition, value);
          // Offset committed automatically with autoCommit
        } catch (error) {
          console.error(`Error processing message from ${topic}:${partition}`, error);
          // Handle poison pill messages appropriately
          throw error;
        }
      },
      // Manual offset management for critical applications
      autoCommit: {
        interval: 5000,
        threshold: 100,
      },
    });
  }

  async disconnect(): Promise<void> {
    await this.consumer.disconnect();
  }

  // Reset offset for replaying events
  async resetOffset(topic: string, offset: string): Promise<void> {
    await this.consumer.seek({ topic, partition: 0, offset });
  }
}

// Usage example
const consumer = new KafkaConsumer({
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  groupId: 'order-processing-service',
  clientId: 'order-processor-1',
  topics: ['orders'],
});

await consumer.connect();
await consumer.subscribe(['orders']);

await consumer.run(async (topic, partition, message) => {
  console.log(`Processing order from partition ${partition}:`, message);

  // Process order (call external API, update database, etc.)
  await processOrder(message);

  // Offset is committed automatically by eachMessage
});
// Spring Kafka @KafkaListener approach
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

@Service
public class KafkaConsumerService {

    @KafkaListener(
        topics = "orders",
        groupId = "order-processing-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            var value = objectMapper.readValue(record.value(), Map.class);
            System.out.printf("Processing order from partition %d: %s%n",
                record.partition(), value);
            processOrder(value);
            ack.acknowledge(); // Manual commit after successful processing
        } catch (Exception e) {
            log.error("Error processing message from {}:{}", record.topic(), record.partition(), e);
            throw new RuntimeException(e);
        }
    }
}

@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-service");
        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "order-processor-1");
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}
# pip install aiokafka
from aiokafka import AIOKafkaConsumer
import asyncio
import json
import os

class KafkaConsumerService:
    def __init__(self, brokers: list[str], group_id: str, client_id: str):
        self.brokers = brokers
        self.group_id = group_id
        self.client_id = client_id
        self.consumer: AIOKafkaConsumer | None = None

    async def connect(self) -> None:
        self.consumer = AIOKafkaConsumer(
            bootstrap_servers=",".join(self.brokers),
            group_id=self.group_id,
            client_id=self.client_id,
            session_timeout_ms=30000,
            heartbeat_interval_ms=3000,
            max_partition_fetch_bytes=1048576,
            enable_auto_commit=True,
            auto_commit_interval_ms=5000,
            sasl_mechanism="PLAIN",
            sasl_plain_username=os.getenv("KAFKA_USER", ""),
            sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
        )
        await self.consumer.start()

    async def subscribe(self, topics: list[str]) -> None:
        self.consumer.subscribe(topics)

    async def run(self, on_message) -> None:
        async for msg in self.consumer:
            try:
                value = json.loads(msg.value.decode())
                await on_message(msg.topic, msg.partition, value)
            except Exception as e:
                print(f"Error processing message from {msg.topic}:{msg.partition}: {e}")
                raise

    async def disconnect(self) -> None:
        if self.consumer:
            await self.consumer.stop()

    # Reset offset for replaying events
    async def reset_offset(self, topic: str, partition: int, offset: int) -> None:
        from aiokafka import TopicPartition
        tp = TopicPartition(topic, partition)
        self.consumer.seek(tp, offset)


# Usage example
async def main():
    consumer = KafkaConsumerService(
        brokers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
        group_id="order-processing-service",
        client_id="order-processor-1",
    )
    await consumer.connect()
    await consumer.subscribe(["orders"])

    async def handle_message(topic, partition, message):
        print(f"Processing order from partition {partition}: {message}")
        await process_order(message)

    await consumer.run(handle_message)
// NuGet: Confluent.Kafka
using Confluent.Kafka;
using System.Text.Json;

public class KafkaConsumerService : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly ILogger<KafkaConsumerService> _logger;

    public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
    {
        _logger = logger;
        var config = new ConsumerConfig
        {
            BootstrapServers = "kafka-1:9092,kafka-2:9092,kafka-3:9092",
            GroupId = "order-processing-service",
            ClientId = "order-processor-1",
            SessionTimeoutMs = 30000,
            HeartbeatIntervalMs = 3000,
            MaxPartitionFetchBytes = 1048576,
            EnableAutoCommit = true,
            AutoCommitIntervalMs = 5000,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
            SaslPassword = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
            SecurityProtocol = SecurityProtocol.SaslSsl,
            AutoOffsetReset = AutoOffsetReset.Latest,
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe("orders");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var result = _consumer.Consume(stoppingToken);
                var value = JsonSerializer.Deserialize<Dictionary<string, object>>(result.Message.Value);
                _logger.LogInformation(
                    "Processing order from partition {Partition}: {Message}",
                    result.Partition.Value, value);
                await ProcessOrderAsync(value!);
            }
            catch (OperationCanceledException) { break; }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing message");
                throw;
            }
        }
        _consumer.Close();
    }

    // Reset offset for replaying events
    public void ResetOffset(string topic, int partition, long offset)
    {
        _consumer.Seek(new TopicPartitionOffset(topic, partition, offset));
    }

    private Task ProcessOrderAsync(Dictionary<string, object> message) =>
        Task.CompletedTask; // Replace with real logic
}

Consumer Groups และ Rebalancing

consumer group ประสานงาน consumers หลายตัวเพื่อประมวลผล topic ในแบบขนาน Kafka กำหนด partitions ให้ consumers โดยอัตโนมัติ:

Topic: orders (6 partitions)

Consumer Group: order-processors
├── Consumer 1 → [Partition 0, 2, 4]
├── Consumer 2 → [Partition 1, 3, 5]
└── Consumer 3 → [Partition 0, 1, 2] (joins → rebalancing occurs)

// After rebalancing:
├── Consumer 1 → [Partition 0, 3]
├── Consumer 2 → [Partition 1, 4]
└── Consumer 3 → [Partition 2, 5]

Rebalancing เกิดขึ้นเมื่อ consumers เข้าร่วม/ออก ในระหว่าง rebalancing:

  • ผู้บริโภคทั้งหมดในกลุ่มหยุดการประมวลผล
  • Partitions ถูกกำหนดใหม่
  • Consumers ทำงานต่อจาก offsets ที่ commit

ลดผลกระทบของ rebalancing:

  • ตั้ง session timeouts สมควร (30 วินาทีเริ่มต้น)
  • มั่นใจว่าการประมวลผล consumer สำเร็จก่อน timeout
  • ใช้ static group membership ในคอนเทนเนอร์เพื่อลด rebalancing

Offset Management

Offsets ติดตามความคืบหน้า partition Commit strategies:

// 1. Auto-commit (default) — simple but risky
// Risk: lose progress if consumer crashes between commit and processing

// 2. Manual commit after processing
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    try {
      await processMessage(message);
      // Commit only after successful processing
      await consumer.commitOffsets([
        {
          topic,
          partition,
          offset: (parseInt(message.offset) + 1).toString(),
        },
      ]);
    } catch (error) {
      // Don't commit, consumer will retry from this offset
      throw error;
    }
  },
  autoCommit: false,
});

// 3. Batch commit — balance throughput and safety
// Commit after processing N messages
let processedCount = 0;
const BATCH_SIZE = 100;

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    await processMessage(message);
    processedCount++;

    if (processedCount >= BATCH_SIZE) {
      await consumer.commitOffsets([
        {
          topic,
          partition,
          offset: (parseInt(message.offset) + 1).toString(),
        },
      ]);
      processedCount = 0;
    }
  },
  autoCommit: false,
});
// 1. Auto-commit — set enable.auto.commit=true in config (default)

// 2. Manual commit after processing (AckMode.MANUAL)
@KafkaListener(topics = "orders", groupId = "order-processors")
public void consumeManual(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        processMessage(record.value());
        ack.acknowledge(); // Commit only after successful processing
    } catch (Exception e) {
        // Don't ack — consumer will retry from this offset
        throw new RuntimeException(e);
    }
}

// 3. Batch commit — AckMode.COUNT
// In container factory: factory.getContainerProperties().setAckCount(100);
@KafkaListener(topics = "orders", groupId = "order-processors",
    containerFactory = "batchListenerContainerFactory")
public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
    for (var record : records) {
        processMessage(record.value());
    }
    ack.acknowledge(); // Commit after entire batch
}
from aiokafka import AIOKafkaConsumer, TopicPartition

# 2. Manual commit after processing
consumer = AIOKafkaConsumer(
    "orders",
    bootstrap_servers="kafka-1:9092",
    group_id="order-processors",
    enable_auto_commit=False,
)
await consumer.start()

async for msg in consumer:
    try:
        await process_message(json.loads(msg.value.decode()))
        # Commit only after successful processing
        await consumer.commit({
            TopicPartition(msg.topic, msg.partition): msg.offset + 1
        })
    except Exception as e:
        # Don't commit — consumer will retry from this offset
        raise

# 3. Batch commit — process N messages then commit
processed_count = 0
BATCH_SIZE = 100
last_offsets = {}

async for msg in consumer:
    await process_message(json.loads(msg.value.decode()))
    processed_count += 1
    last_offsets[TopicPartition(msg.topic, msg.partition)] = msg.offset + 1

    if processed_count >= BATCH_SIZE:
        await consumer.commit(last_offsets)
        processed_count = 0
        last_offsets = {}
// 2. Manual commit after processing
var config = new ConsumerConfig
{
    EnableAutoCommit = false,
    // ...other config
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");

while (true)
{
    var result = consumer.Consume();
    try
    {
        await ProcessMessageAsync(result.Message.Value);
        // Commit only after successful processing
        consumer.Commit(result);
    }
    catch (Exception)
    {
        // Don't commit — consumer will retry from this offset
        throw;
    }
}

// 3. Batch commit — commit after N messages
int processedCount = 0;
const int BATCH_SIZE = 100;
ConsumeResult<string, string>? lastResult = null;

while (true)
{
    var result = consumer.Consume();
    await ProcessMessageAsync(result.Message.Value);
    processedCount++;
    lastResult = result;

    if (processedCount >= BATCH_SIZE && lastResult != null)
    {
        consumer.Commit(lastResult);
        processedCount = 0;
        lastResult = null;
    }
}

Delivery Guarantees

Kafka รองรับ delivery semantics สามตัว โดยแต่ละตัวมี tradeoffs:

At-Least-Once (บ่อยที่สุด)

Producer ลองใหม่ failed sends Consumer commits offset หลังจากการประมวลผล

Risk: Duplicates ถ้า consumer ล้มเหลวหลังจากการประมวลผลแต่ก่อน committing offset ข้อความเดียวกันประมวลผลสองครั้ง

// Configuration for at-least-once
const producer = kafka.producer({
  acks: 1, // Wait for leader acknowledgment
  retries: 5,
  timeout: 30000,
});

// Consumer: commit offset after processing
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    await processMessage(message); // Must be idempotent!
    await consumer.commitOffsets([...]);
  },
});
// Configuration for at-least-once
@Bean
public ProducerFactory<String, String> atLeastOnceProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader acknowledgment
    config.put(ProducerConfig.RETRIES_CONFIG, 5);
    config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    return new DefaultKafkaProducerFactory<>(config);
}

// Consumer: commit offset after processing (AckMode.MANUAL)
@KafkaListener(topics = "orders", groupId = "processors")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    processMessage(record.value()); // Must be idempotent!
    ack.acknowledge();
}
# Configuration for at-least-once
producer = AIOKafkaProducer(
    bootstrap_servers="kafka-1:9092",
    acks=1,  # Wait for leader acknowledgment
    request_timeout_ms=30000,
)

# Consumer: commit offset after processing
async for msg in consumer:
    await process_message(json.loads(msg.value.decode()))  # Must be idempotent!
    await consumer.commit({TopicPartition(msg.topic, msg.partition): msg.offset + 1})
// Configuration for at-least-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
    Acks = Acks.Leader, // Wait for leader acknowledgment
    MessageTimeoutMs = 30000,
    // Retries handled by EnableIdempotence or manual retry logic
};

// Consumer: commit offset after processing
while (true)
{
    var result = consumer.Consume();
    await ProcessMessageAsync(result.Message.Value); // Must be idempotent!
    consumer.Commit(result);
}

Mitigation: ทำให้การประมวลผลข้อความ idempotent โดยใช้ deduplication:

// Idempotent processing example
async function processOrderIdempotent(order: any) {
  // Check if we've already processed this order
  const exists = await database.query(
    'SELECT id FROM processed_orders WHERE order_id = ?',
    [order.orderId]
  );

  if (exists) {
    console.log('Order already processed, skipping');
    return;
  }

  // Process order
  await database.execute('INSERT INTO orders VALUES (?)', [order]);
  await database.execute('INSERT INTO processed_orders VALUES (?, ?)', [
    order.orderId,
    Date.now(),
  ]);
}
@Service
public class IdempotentOrderService {
    private final JdbcTemplate jdbcTemplate;

    public void processOrderIdempotent(Map<String, Object> order) {
        String orderId = (String) order.get("orderId");

        // Check if we've already processed this order
        Integer count = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM processed_orders WHERE order_id = ?",
            Integer.class, orderId);

        if (count != null && count > 0) {
            log.info("Order already processed, skipping: {}", orderId);
            return;
        }

        // Process order
        jdbcTemplate.update("INSERT INTO orders VALUES (?)", orderId);
        jdbcTemplate.update(
            "INSERT INTO processed_orders VALUES (?, ?)",
            orderId, System.currentTimeMillis());
    }
}
async def process_order_idempotent(order: dict, db) -> None:
    order_id = order["orderId"]

    # Check if we've already processed this order
    result = await db.fetch_one(
        "SELECT id FROM processed_orders WHERE order_id = :order_id",
        {"order_id": order_id},
    )

    if result:
        print(f"Order already processed, skipping: {order_id}")
        return

    # Process order
    await db.execute("INSERT INTO orders VALUES (:order)", {"order": order_id})
    await db.execute(
        "INSERT INTO processed_orders VALUES (:order_id, :ts)",
        {"order_id": order_id, "ts": int(asyncio.get_event_loop().time() * 1000)},
    )
public class IdempotentOrderService
{
    private readonly IDbConnection _db;

    public async Task ProcessOrderIdempotentAsync(Dictionary<string, object> order)
    {
        var orderId = order["orderId"].ToString()!;

        // Check if we've already processed this order
        var exists = await _db.QueryFirstOrDefaultAsync<int>(
            "SELECT COUNT(1) FROM processed_orders WHERE order_id = @OrderId",
            new { OrderId = orderId });

        if (exists > 0)
        {
            Console.WriteLine($"Order already processed, skipping: {orderId}");
            return;
        }

        // Process order
        await _db.ExecuteAsync("INSERT INTO orders VALUES (@OrderId)", new { OrderId = orderId });
        await _db.ExecuteAsync(
            "INSERT INTO processed_orders VALUES (@OrderId, @Ts)",
            new { OrderId = orderId, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() });
    }
}

At-Most-Once

Producer ส่งครั้งเดียว Consumer commits offset ก่อนการประมวลผล

Risk: การสูญเสียข้อความถ้าการประมวลผลล้มเหลวก่อนสมบูรณ์

// Configuration for at-most-once
const producer = kafka.producer({
  acks: 0, // No acknowledgment, fire and forget
  retries: 0,
});

// Consumer: commit offset before processing
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    await consumer.commitOffsets([...]); // Commit first
    await processMessage(message); // Process after
  },
});
// Configuration for at-most-once
@Bean
public ProducerFactory<String, String> atMostOnceProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.ACKS_CONFIG, "0"); // No acknowledgment
    config.put(ProducerConfig.RETRIES_CONFIG, 0);
    return new DefaultKafkaProducerFactory<>(config);
}

// Consumer: commit offset before processing
@KafkaListener(topics = "orders", groupId = "processors")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    ack.acknowledge(); // Commit first
    processMessage(record.value()); // Process after
}
# Configuration for at-most-once
producer = AIOKafkaProducer(
    bootstrap_servers="kafka-1:9092",
    acks=0,  # No acknowledgment, fire and forget
)

# Consumer: commit offset before processing
async for msg in consumer:
    await consumer.commit({  # Commit first
        TopicPartition(msg.topic, msg.partition): msg.offset + 1
    })
    await process_message(json.loads(msg.value.decode()))  # Process after
// Configuration for at-most-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
    Acks = Acks.None, // No acknowledgment, fire and forget
};

// Consumer: commit offset before processing
while (true)
{
    var result = consumer.Consume();
    consumer.Commit(result); // Commit first
    await ProcessMessageAsync(result.Message.Value); // Process after
}

Exactly-Once (Idempotent Producer + Transactions)

Idempotent producer ด้วย deduplication Transactional writes มั่นใจ atomicity

// Producer configuration for exactly-once
const producer = kafka.producer({
  idempotent: true, // Enable idempotent producer
  maxInFlightRequests: 5, // Maintain ordering
  transactionTimeout: 60000,
});

// Transactional write
const transaction = await producer.transaction();
try {
  await transaction.send({
    topic: 'orders',
    messages: [
      { key: orderId, value: JSON.stringify(orderData) },
    ],
  });
  await transaction.commit();
} catch (error) {
  await transaction.abort();
  throw error;
}

// Consumer: enable isolation level
const consumer = kafka.consumer({
  groupId: 'order-processors',
  isolationLevel: 1, // Read committed — only committed messages
});
// Producer configuration for exactly-once
@Bean
public ProducerFactory<String, String> exactlyOnceProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
    config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-1");
    config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
    return new DefaultKafkaProducerFactory<>(config);
}

// Transactional write
kafkaTemplate.executeInTransaction(kt -> {
    kt.send("orders", orderId, objectMapper.writeValueAsString(orderData));
    return true;
});

// Consumer: read committed isolation level
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
# Producer configuration for exactly-once
producer = AIOKafkaProducer(
    bootstrap_servers="kafka-1:9092",
    enable_idempotence=True,
    max_in_flight_requests_per_connection=5,
    transactional_id="order-tx-1",
    transaction_timeout_ms=60000,
)
await producer.start()

# Transactional write
async with producer.transaction():
    await producer.send("orders", key=order_id.encode(),
                        value=json.dumps(order_data).encode())

# Consumer: read committed isolation level
consumer = AIOKafkaConsumer(
    "orders",
    bootstrap_servers="kafka-1:9092",
    group_id="order-processors",
    isolation_level="read_committed",
)
// Producer configuration for exactly-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
    EnableIdempotence = true,
    MaxInFlight = 5,
    TransactionalId = "order-tx-1",
    TransactionTimeoutMs = 60000,
};
var producer = new ProducerBuilder<string, string>(producerConfig).Build();

// Transactional write
producer.InitTransactions(TimeSpan.FromSeconds(30));
producer.BeginTransaction();
try
{
    producer.Produce("orders", new Message<string, string>
    {
        Key = orderId,
        Value = JsonSerializer.Serialize(orderData),
    });
    producer.CommitTransaction();
}
catch
{
    producer.AbortTransaction();
    throw;
}

// Consumer: read committed isolation level
var consumerConfig = new ConsumerConfig
{
    IsolationLevel = IsolationLevel.ReadCommitted,
    GroupId = "order-processors",
};

Guarantees:

  • Idempotent producer: Broker deduplicates ตามเสิร์จ ID และหมายเลขลำดับ
  • Transactional writes: Multiple topic writes atomically สำเร็จหรือล้มเหลว
  • Read committed: Consumer เห็นเฉพาะข้อความที่ commit ไม่ใช่การเขียนที่ไม่ commit จากธุรกรรมที่ล้มเหลว

Schema Registry และ Schema Evolution

Schemas กำหนดโครงสร้างของเหตุการณ์ Schema Registry เปิดใจให้ทีมพัฒนา schemas อย่างปลอดภัย:

import { SchemaRegistry, SchemaType } from '@confluent/kafka-javascript';

const schemaRegistry = new SchemaRegistry({
  host: 'http://schema-registry:8081',
  auth: {
    username: process.env.SR_USER,
    password: process.env.SR_PASSWORD,
  },
});

// Define an order event schema
const orderSchema = {
  type: 'record',
  name: 'Order',
  namespace: 'com.example.orders',
  fields: [
    { name: 'orderId', type: 'string' },
    { name: 'customerId', type: 'string' },
    { name: 'amount', type: 'double' },
    { name: 'timestamp', type: 'long' },
    // New field with default (backward compatible)
    { name: 'currency', type: 'string', default: 'USD' },
  ],
};

// Register schema
const schemaId = await schemaRegistry.register(
  SchemaType.AVRO,
  {
    type: 'AVRO',
    schema: JSON.stringify(orderSchema),
  },
  'orders-value'
);

// Serialize with schema
async function serializeOrder(order: any): Promise<Buffer> {
  const schemaId = await schemaRegistry.getSchemaId(
    'orders-value',
    SchemaType.AVRO
  );
  const serializer = schemaRegistry.getSerializer({ schemaId });
  return serializer.serialize(order);
}

// Deserialize with schema
async function deserializeOrder(buffer: Buffer): Promise<any> {
  const deserializer = schemaRegistry.getDeserializer();
  return deserializer.deserialize(buffer);
}

// Send Avro-serialized event
const serialized = await serializeOrder({
  orderId: 'order-123',
  customerId: 'cust-456',
  amount: 99.99,
  timestamp: Date.now(),
  currency: 'USD',
});

await producer.send({
  topic: 'orders',
  messages: [{ key: 'order-123', value: serialized }],
});
// Maven: io.confluent:kafka-avro-serializer
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

@Service
public class SchemaRegistryProducerService {
    private final KafkaTemplate<String, GenericRecord> avroTemplate;

    private static final String ORDER_SCHEMA_JSON = """
        {
          "type": "record",
          "name": "Order",
          "namespace": "com.example.orders",
          "fields": [
            {"name": "orderId", "type": "string"},
            {"name": "customerId", "type": "string"},
            {"name": "amount", "type": "double"},
            {"name": "timestamp", "type": "long"},
            {"name": "currency", "type": "string", "default": "USD"}
          ]
        }""";

    private final Schema orderSchema = new Schema.Parser().parse(ORDER_SCHEMA_JSON);

    public void sendOrderEvent(String orderId, String customerId, double amount) {
        GenericRecord record = new GenericData.Record(orderSchema);
        record.put("orderId", orderId);
        record.put("customerId", customerId);
        record.put("amount", amount);
        record.put("timestamp", System.currentTimeMillis());
        record.put("currency", "USD");

        avroTemplate.send("orders", orderId, record);
    }
}

// Spring Boot config for Avro serializer
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
    config.put("schema.registry.url", "http://schema-registry:8081");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
}
# pip install confluent-kafka[avro] fastavro
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka import SerializingProducer, DeserializingConsumer
import os

schema_registry_client = SchemaRegistryClient({
    "url": "http://schema-registry:8081",
    "basic.auth.user.info": f"{os.getenv('SR_USER')}:{os.getenv('SR_PASSWORD')}",
})

order_schema_str = """
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.orders",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long"},
    {"name": "currency", "type": "string", "default": "USD"}
  ]
}"""

avro_serializer = AvroSerializer(schema_registry_client, order_schema_str)
avro_deserializer = AvroDeserializer(schema_registry_client)

producer = SerializingProducer({
    "bootstrap.servers": "kafka-1:9092",
    "key.serializer": lambda key, ctx: key.encode(),
    "value.serializer": avro_serializer,
})

# Send Avro-serialized event
producer.produce(
    topic="orders",
    key="order-123",
    value={
        "orderId": "order-123",
        "customerId": "cust-456",
        "amount": 99.99,
        "timestamp": int(time.time() * 1000),
        "currency": "USD",
    },
)
producer.flush()
// NuGet: Confluent.SchemaRegistry.Serdes.Avro
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Confluent.Kafka;
using Avro;
using Avro.Generic;

var schemaRegistryConfig = new SchemaRegistryConfig
{
    Url = "http://schema-registry:8081",
    BasicAuthUserInfo = $"{Environment.GetEnvironmentVariable("SR_USER")}:{Environment.GetEnvironmentVariable("SR_PASSWORD")}",
};

using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

const string orderSchemaJson = """
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.orders",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long"},
    {"name": "currency", "type": "string", "default": "USD"}
  ]
}""";

var schema = (RecordSchema)Schema.Parse(orderSchemaJson);

var producerConfig = new ProducerConfig { BootstrapServers = "kafka-1:9092" };
using var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
    .Build();

// Send Avro-serialized event
var record = new GenericRecord(schema);
record.Add("orderId", "order-123");
record.Add("customerId", "cust-456");
record.Add("amount", 99.99);
record.Add("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
record.Add("currency", "USD");

await producer.ProduceAsync("orders", new Message<string, GenericRecord>
{
    Key = "order-123",
    Value = record,
});

Schema Evolution Rules

รักษาความเข้ากันได้โดยใช้กลยุทธ์การสร้างแบบเวอร์ชั่น:

BACKWARD compatibility: New schema can read data written with old schema
├── Add fields with defaults ✓
├── Remove fields ✗
└── Rename fields ✗

FORWARD compatibility: Old schema can read data written with new schema
├── Add fields ✓
├── Remove fields with defaults ✓
└── Rename fields ✗

FULL compatibility: Both backward and forward
├── Only add/remove fields with defaults ✓

Transitive compatibility: All versions can interoperate

Event Sourcing และ CQRS Patterns

Event sourcing ทำให้เหตุการณ์เป็นแหล่งความจริง แทนที่จะจัดเก็บสถานะสุดท้าย ให้จัดเก็บเหตุการณ์ที่เปลี่ยนสถานะทั้งหมด:

// Event Sourcing: Store all events, derive state
interface OrderEvent {
  eventType: 'OrderCreated' | 'OrderPaid' | 'OrderShipped' | 'OrderCancelled';
  aggregateId: string;
  timestamp: number;
  data: any;
}

class OrderAggregate {
  private events: OrderEvent[] = [];
  private orderId: string;

  async applyEvent(event: OrderEvent): Promise<void> {
    this.events.push(event);

    // Send to event store (Kafka)
    await producer.sendEvent('order-events', event.aggregateId, event);

    // Update read models for CQRS
    await this.updateReadModel(event);
  }

  private async updateReadModel(event: OrderEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await database.execute(
          'INSERT INTO order_summary (order_id, status) VALUES (?, ?)',
          [event.aggregateId, 'pending']
        );
        break;
      case 'OrderPaid':
        await database.execute(
          'UPDATE order_summary SET status = ? WHERE order_id = ?',
          ['paid', event.aggregateId]
        );
        break;
      case 'OrderShipped':
        await database.execute(
          'UPDATE order_summary SET status = ? WHERE order_id = ?',
          ['shipped', event.aggregateId]
        );
        break;
    }
  }

  // Rebuild state from event stream
  static async loadFromEvents(
    orderId: string,
    events: OrderEvent[]
  ): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate();
    aggregate.orderId = orderId;
    aggregate.events = events;
    return aggregate;
  }
}

// Usage
const order = new OrderAggregate();
await order.applyEvent({
  eventType: 'OrderCreated',
  aggregateId: 'order-123',
  timestamp: Date.now(),
  data: {
    customerId: 'cust-456',
    items: [{ productId: 'prod-1', quantity: 2 }],
  },
});
// Event Sourcing: Store all events, derive state
public sealed interface OrderEvent permits OrderCreated, OrderPaid, OrderShipped, OrderCancelled {
    String aggregateId();
    long timestamp();
}

public record OrderCreated(String aggregateId, long timestamp, String customerId, List<OrderItem> items) implements OrderEvent {}
public record OrderPaid(String aggregateId, long timestamp, double amount) implements OrderEvent {}
public record OrderShipped(String aggregateId, long timestamp, String trackingNumber) implements OrderEvent {}

@Service
public class OrderAggregate {
    private final List<OrderEvent> events = new ArrayList<>();
    private final KafkaProducerService producerService;
    private final JdbcTemplate jdbcTemplate;

    public void applyEvent(OrderEvent event) throws Exception {
        events.add(event);
        // Send to event store (Kafka)
        producerService.sendEvent("order-events", event.aggregateId(),
            objectMapper.writeValueAsString(event)).get();
        // Update read models for CQRS
        updateReadModel(event);
    }

    private void updateReadModel(OrderEvent event) {
        switch (event) {
            case OrderCreated e -> jdbcTemplate.update(
                "INSERT INTO order_summary (order_id, status) VALUES (?, ?)",
                e.aggregateId(), "pending");
            case OrderPaid e -> jdbcTemplate.update(
                "UPDATE order_summary SET status = ? WHERE order_id = ?",
                "paid", e.aggregateId());
            case OrderShipped e -> jdbcTemplate.update(
                "UPDATE order_summary SET status = ? WHERE order_id = ?",
                "shipped", e.aggregateId());
            default -> {}
        }
    }

    public static OrderAggregate loadFromEvents(String orderId, List<OrderEvent> events) {
        var aggregate = new OrderAggregate(null, null);
        aggregate.events.addAll(events);
        return aggregate;
    }
}
from dataclasses import dataclass, field
from typing import Literal
import asyncio

@dataclass
class OrderEvent:
    event_type: Literal["OrderCreated", "OrderPaid", "OrderShipped", "OrderCancelled"]
    aggregate_id: str
    timestamp: int
    data: dict

class OrderAggregate:
    def __init__(self, producer, db):
        self.events: list[OrderEvent] = []
        self.order_id: str = ""
        self._producer = producer
        self._db = db

    async def apply_event(self, event: OrderEvent) -> None:
        self.events.append(event)
        # Send to event store (Kafka)
        await self._producer.send_event("order-events", event.aggregate_id, {
            "eventType": event.event_type,
            "aggregateId": event.aggregate_id,
            "timestamp": event.timestamp,
            "data": event.data,
        })
        # Update read models for CQRS
        await self._update_read_model(event)

    async def _update_read_model(self, event: OrderEvent) -> None:
        match event.event_type:
            case "OrderCreated":
                await self._db.execute(
                    "INSERT INTO order_summary (order_id, status) VALUES (:id, :status)",
                    {"id": event.aggregate_id, "status": "pending"},
                )
            case "OrderPaid":
                await self._db.execute(
                    "UPDATE order_summary SET status = :status WHERE order_id = :id",
                    {"status": "paid", "id": event.aggregate_id},
                )
            case "OrderShipped":
                await self._db.execute(
                    "UPDATE order_summary SET status = :status WHERE order_id = :id",
                    {"status": "shipped", "id": event.aggregate_id},
                )

    @classmethod
    def load_from_events(cls, order_id: str, events: list[OrderEvent]) -> "OrderAggregate":
        aggregate = cls(None, None)
        aggregate.order_id = order_id
        aggregate.events = events
        return aggregate


# Usage
order = OrderAggregate(producer, db)
await order.apply_event(OrderEvent(
    event_type="OrderCreated",
    aggregate_id="order-123",
    timestamp=int(asyncio.get_event_loop().time() * 1000),
    data={"customerId": "cust-456", "items": [{"productId": "prod-1", "quantity": 2}]},
))
// Event Sourcing: Store all events, derive state
public abstract record OrderEvent(string AggregateId, long Timestamp);
public record OrderCreated(string AggregateId, long Timestamp, string CustomerId, List<OrderItem> Items) : OrderEvent(AggregateId, Timestamp);
public record OrderPaid(string AggregateId, long Timestamp, double Amount) : OrderEvent(AggregateId, Timestamp);
public record OrderShipped(string AggregateId, long Timestamp, string TrackingNumber) : OrderEvent(AggregateId, Timestamp);

public class OrderAggregate
{
    private readonly List<OrderEvent> _events = [];
    private readonly KafkaProducerService _producer;
    private readonly IDbConnection _db;

    public async Task ApplyEventAsync(OrderEvent evt)
    {
        _events.Add(evt);
        // Send to event store (Kafka)
        await _producer.SendEventAsync("order-events", evt.AggregateId, evt);
        // Update read models for CQRS
        await UpdateReadModelAsync(evt);
    }

    private async Task UpdateReadModelAsync(OrderEvent evt)
    {
        var sql = evt switch
        {
            OrderCreated e => ("INSERT INTO order_summary (order_id, status) VALUES (@Id, @Status)",
                new { Id = e.AggregateId, Status = "pending" }),
            OrderPaid e => ("UPDATE order_summary SET status = @Status WHERE order_id = @Id",
                new { Id = e.AggregateId, Status = "paid" }),
            OrderShipped e => ("UPDATE order_summary SET status = @Status WHERE order_id = @Id",
                new { Id = e.AggregateId, Status = "shipped" }),
            _ => (null, null),
        };
        if (sql.Item1 != null)
            await _db.ExecuteAsync(sql.Item1, sql.Item2);
    }

    public static OrderAggregate LoadFromEvents(string orderId, IEnumerable<OrderEvent> events)
    {
        var aggregate = new OrderAggregate(null!, null!);
        aggregate._events.AddRange(events);
        return aggregate;
    }
}

Benefits:

  • บันทึกตรวจสอบที่สมบูรณ์ของการเปลี่ยนแปลงสถานะทั้งหมด
  • Time travel: สร้างสถานะใหม่ในจุดใดๆ
  • เล่นซ้ำเหตุการณ์สำหรับการแก้จุดบกพร่องหรือการวิเคราะห์
  • อ่านและเขียนแยกรุ่น (CQRS)

Challenges:

  • Eventual consistency ต้องการการจัดการ
  • การสร้างสถานะใหม่สามารถช้า
  • การลบข้อมูล (GDPR) ต้องการการจัดการอย่างระมัดระวัง

Kafka Streams: Stateful Stream Processing

Kafka Streams ประมวลผลเหตุการณ์ด้วยการดำเนินการ stateful เช่น aggregations และ joins:

// Kafka Streams topology (pseudo-code with KafkaJS equivalent)

// 1. Stream-Stream Join: Correlate events from different topics
async function streamStreamJoin() {
  // Consume from orders and payments topics
  // Join on orderId within time window
  // Emit joined events to order-fulfillment topic

  const orderStream = new Map();
  const paymentStream = new Map();
  const joinWindow = 30000; // 30 seconds

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      const { orderId, timestamp, ...data } = JSON.parse(message.value);

      if (topic === 'orders') {
        orderStream.set(orderId, { ...data, timestamp });
      } else if (topic === 'payments') {
        paymentStream.set(orderId, { ...data, timestamp });
      }

      // Check for join opportunity
      if (orderStream.has(orderId) && paymentStream.has(orderId)) {
        const order = orderStream.get(orderId);
        const payment = paymentStream.get(orderId);

        if (Math.abs(order.timestamp - payment.timestamp) <= joinWindow) {
          // Emit joined event
          await producer.sendEvent('order-fulfillment', orderId, {
            orderId,
            order,
            payment,
            joinedAt: Date.now(),
          });

          orderStream.delete(orderId);
          paymentStream.delete(orderId);
        }
      }
    },
  });
}

// 2. Windowed Aggregation: Compute metrics over time windows
async function windowedAggregation() {
  const windows = new Map();
  const WINDOW_SIZE = 60000; // 1 minute
  const WINDOW_ADVANCE = 10000; // 10 seconds

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value);
      const windowKey = Math.floor(event.timestamp / WINDOW_ADVANCE);

      if (!windows.has(windowKey)) {
        windows.set(windowKey, { count: 0, sum: 0 });
      }

      const window = windows.get(windowKey);
      window.count++;
      window.sum += event.amount;

      // Emit windowed metric
      await producer.sendEvent('revenue-metrics', `window-${windowKey}`, {
        window: windowKey,
        count: window.count,
        sum: window.sum,
        avg: window.sum / window.count,
      });
    },
  });
}

// 3. Stateful Map: Maintain per-key state
async function statefulProcessing() {
  const userState = new Map();

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value);
      const userId = message.key.toString();

      if (!userState.has(userId)) {
        userState.set(userId, { events: [], lastUpdate: 0 });
      }

      const state = userState.get(userId);
      state.events.push(event);
      state.lastUpdate = Date.now();

      // Emit enriched event
      await producer.sendEvent('enriched-events', userId, {
        event,
        userContext: {
          totalEvents: state.events.length,
          lastUpdate: state.lastUpdate,
        },
      });
    },
  });
}
// Kafka Streams DSL — native Java API
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

@Configuration
public class KafkaStreamsTopology {

    // 1. Stream-Stream Join
    @Bean
    public KStream<String, String> streamStreamJoin(StreamsBuilder builder) {
        KStream<String, String> orders = builder.stream("orders");
        KStream<String, String> payments = builder.stream("payments");

        JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30));

        orders.join(
            payments,
            (order, payment) -> {
                var joined = Map.of("order", order, "payment", payment, "joinedAt", System.currentTimeMillis());
                return objectMapper.writeValueAsString(joined);
            },
            joinWindow,
            StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
        ).to("order-fulfillment");

        return orders;
    }

    // 2. Windowed Aggregation
    @Bean
    public KStream<String, String> windowedAggregation(StreamsBuilder builder) {
        builder.<String, String>stream("orders")
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
                .advanceBy(Duration.ofSeconds(10)))
            .aggregate(
                () -> "{\"count\":0,\"sum\":0.0}",
                (key, value, aggregate) -> {
                    // Parse and update aggregate
                    var agg = parseAggregate(aggregate);
                    var event = parseEvent(value);
                    agg.put("count", (int) agg.get("count") + 1);
                    agg.put("sum", (double) agg.get("sum") + (double) event.get("amount"));
                    agg.put("avg", (double) agg.get("sum") / (int) agg.get("count"));
                    return objectMapper.writeValueAsString(agg);
                },
                Materialized.with(Serdes.String(), Serdes.String())
            )
            .toStream()
            .map((windowedKey, value) -> KeyValue.pair(
                "window-" + windowedKey.window().start(), value))
            .to("revenue-metrics");

        return builder.stream("orders");
    }

    // 3. Stateful Map with state store
    @Bean
    public KStream<String, String> statefulProcessing(StreamsBuilder builder) {
        StoreBuilder<KeyValueStore<String, String>> storeBuilder =
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("user-state"),
                Serdes.String(), Serdes.String());
        builder.addStateStore(storeBuilder);

        builder.<String, String>stream("events")
            .transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
                private KeyValueStore<String, String> store;

                @Override
                public void init(ProcessorContext ctx) {
                    store = ctx.getStateStore("user-state");
                }

                @Override
                public KeyValue<String, String> transform(String userId, String value) {
                    var state = Optional.ofNullable(store.get(userId))
                        .map(s -> parseState(s))
                        .orElse(new HashMap<>());
                    state.put("lastUpdate", System.currentTimeMillis());
                    state.merge("totalEvents", 1, (a, b) -> (int)a + (int)b);
                    store.put(userId, objectMapper.writeValueAsString(state));
                    return KeyValue.pair(userId, buildEnrichedEvent(value, state));
                }
            }, "user-state")
            .to("enriched-events");

        return builder.stream("events");
    }
}
# pip install faust-streaming (Faust — Python Kafka Streams)
import faust
from datetime import timedelta

app = faust.App("stream-processor", broker="kafka://kafka-1:9092")

orders_topic = app.topic("orders", value_type=bytes)
payments_topic = app.topic("payments", value_type=bytes)
fulfillment_topic = app.topic("order-fulfillment", value_type=bytes)
metrics_topic = app.topic("revenue-metrics", value_type=bytes)
enriched_topic = app.topic("enriched-events", value_type=bytes)

# 1. Stream-Stream Join (in-memory within window)
order_table = app.Table("order-stream", default=dict)
payment_table = app.Table("payment-stream", default=dict)
JOIN_WINDOW = 30  # seconds

@app.agent(orders_topic)
async def process_orders(orders):
    async for order_bytes in orders:
        order = json.loads(order_bytes)
        order_id = order["orderId"]
        order_table[order_id] = {**order, "ts": time.time()}
        await _try_join(order_id)

@app.agent(payments_topic)
async def process_payments(payments):
    async for payment_bytes in payments:
        payment = json.loads(payment_bytes)
        order_id = payment["orderId"]
        payment_table[order_id] = {**payment, "ts": time.time()}
        await _try_join(order_id)

async def _try_join(order_id: str) -> None:
    order = order_table.get(order_id)
    payment = payment_table.get(order_id)
    if order and payment and abs(order["ts"] - payment["ts"]) <= JOIN_WINDOW:
        await fulfillment_topic.send(key=order_id, value=json.dumps({
            "orderId": order_id, "order": order, "payment": payment,
            "joinedAt": int(time.time() * 1000),
        }).encode())
        del order_table[order_id]
        del payment_table[order_id]

# 2. Windowed Aggregation
window_state = app.Table("revenue-windows", default=lambda: {"count": 0, "sum": 0.0})
WINDOW_ADVANCE = 10  # seconds

@app.agent(orders_topic)
async def windowed_aggregation(events):
    async for event_bytes in events:
        event = json.loads(event_bytes)
        window_key = int(event["timestamp"] / 1000 / WINDOW_ADVANCE)
        state = window_state[window_key]
        state["count"] += 1
        state["sum"] += event.get("amount", 0)
        state["avg"] = state["sum"] / state["count"]
        window_state[window_key] = state
        await metrics_topic.send(key=f"window-{window_key}", value=json.dumps(state).encode())

# 3. Stateful Map
user_state_table = app.Table("user-state", default=lambda: {"events": [], "lastUpdate": 0})

@app.agent(app.topic("events", value_type=bytes))
async def stateful_processing(events):
    async for event_bytes in events:
        event = json.loads(event_bytes)
        user_id = event.get("userId", "unknown")
        state = user_state_table[user_id]
        state["events"].append(event)
        state["lastUpdate"] = int(time.time() * 1000)
        user_state_table[user_id] = state
        await enriched_topic.send(key=user_id, value=json.dumps({
            "event": event,
            "userContext": {"totalEvents": len(state["events"]), "lastUpdate": state["lastUpdate"]},
        }).encode())
// StreamProcessing with Confluent.Kafka — manual stateful approach
// (For full Kafka Streams API use Java; .NET uses manual consumer patterns)
using Confluent.Kafka;
using System.Collections.Concurrent;

public class KafkaStreamProcessor : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly IProducer<string, string> _producer;

    // 1. Stream-Stream Join state
    private readonly ConcurrentDictionary<string, (string Data, long Ts)> _orderStream = new();
    private readonly ConcurrentDictionary<string, (string Data, long Ts)> _paymentStream = new();
    private const long JoinWindowMs = 30_000;

    // 2. Windowed Aggregation state
    private readonly ConcurrentDictionary<long, (int Count, double Sum)> _windows = new();
    private const long WindowAdvanceMs = 10_000;

    // 3. Stateful Map
    private readonly ConcurrentDictionary<string, (List<string> Events, long LastUpdate)> _userState = new();

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        _consumer.Subscribe(new[] { "orders", "payments", "events" });

        while (!ct.IsCancellationRequested)
        {
            var result = _consumer.Consume(ct);
            var value = result.Message.Value;
            var key = result.Message.Key;

            switch (result.Topic)
            {
                case "orders":
                    await HandleOrderAsync(key, value);
                    break;
                case "payments":
                    await HandlePaymentAsync(key, value);
                    break;
                case "events":
                    await HandleStatefulEventAsync(key, value);
                    break;
            }
        }
    }

    private async Task HandleOrderAsync(string orderId, string data)
    {
        _orderStream[orderId] = (data, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
        await TryJoinAsync(orderId);
    }

    private async Task HandlePaymentAsync(string orderId, string data)
    {
        _paymentStream[orderId] = (data, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
        await TryJoinAsync(orderId);
    }

    private async Task TryJoinAsync(string orderId)
    {
        if (_orderStream.TryGetValue(orderId, out var order) &&
            _paymentStream.TryGetValue(orderId, out var payment) &&
            Math.Abs(order.Ts - payment.Ts) <= JoinWindowMs)
        {
            var joined = JsonSerializer.Serialize(new { orderId, order = order.Data, payment = payment.Data, joinedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() });
            await _producer.ProduceAsync("order-fulfillment", new Message<string, string> { Key = orderId, Value = joined });
            _orderStream.TryRemove(orderId, out _);
            _paymentStream.TryRemove(orderId, out _);
        }
    }

    private async Task HandleStatefulEventAsync(string userId, string eventJson)
    {
        var (events, _) = _userState.GetOrAdd(userId, _ => ([], 0));
        events.Add(eventJson);
        _userState[userId] = (events, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
        var enriched = JsonSerializer.Serialize(new { Event = eventJson, UserContext = new { TotalEvents = events.Count, LastUpdate = _userState[userId].LastUpdate } });
        await _producer.ProduceAsync("enriched-events", new Message<string, string> { Key = userId, Value = enriched });
    }
}

Common patterns:

  • Stream-Stream Join: ประสานเหตุการณ์จากสตรีมหลายตัว
  • Stream-Table Join: เสริมเหตุการณ์สตรีมด้วยตาราการค้นหา
  • Windowed Aggregation: คำนวณเมตริกซ์ในช่วงเวลา
  • Stateful Map: รักษาสถานะต่อ-key ไปยังเหตุการณ์

Change Data Capture (CDC) ด้วย Debezium

CDC streams database เปลี่ยนเป็นเหตุการณ์ ซิงค์โครไนซ์ข้อมูลข้ามระบบ:

// Debezium MySQL connector configuration
// Deployed via Kafka Connect

const debeziumConfig = {
  name: 'mysql-orders-cdc',
  config: {
    connector: 'io.debezium.connector.mysql.MySqlConnector',
    database: {
      hostname: 'mysql.example.com',
      port: 3306,
      user: 'debezium',
      password: process.env.MYSQL_PASSWORD,
      dbname: 'orders_db',
    },
    plugin: 'pgoutput',
    publication: 'dbz_publication',
    slot: 'dbz_slot',
    table: 'orders',
    transforms: 'route',
    'transforms.route.type': 'org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.route.regex': '([^.]+)\\.([^.]+)\\.([^.]+)',
    'transforms.route.replacement': '${1}-${3}',
    'key.converter': 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter': 'org.apache.kafka.connect.json.JsonConverter',
  },
};

// CDC messages have structure:
// {
//   "before": { old values },
//   "after": { new values },
//   "source": { database metadata },
//   "op": "c|u|d" (create, update, delete),
//   "ts_ms": timestamp
// }

// Consume CDC events and synchronize read model
await consumer.run({
  eachMessage: async ({ message }) => {
    const change = JSON.parse(message.value);

    switch (change.op) {
      case 'c': // Create
      case 'u': // Update
        await syncReadModel('orders', change.after);
        break;
      case 'd': // Delete
        await deleteFromReadModel('orders', change.before.id);
        break;
    }

    // Emit domain events from CDC
    await producer.sendEvent('order-events', change.after.id, {
      eventType: change.op === 'd' ? 'OrderDeleted' : 'OrderUpdated',
      order: change.after,
      timestamp: change.ts_ms,
    });
  },
});
// Consume CDC events from Debezium and synchronize read model
@Service
public class CdcConsumerService {

    @KafkaListener(topics = "mysql-orders", groupId = "cdc-sync")
    public void consumeCdcEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            var change = objectMapper.readValue(record.value(), Map.class);
            String op = (String) change.get("op");

            switch (op) {
                case "c": // Create
                case "u": // Update
                    syncReadModel("orders", (Map<String, Object>) change.get("after"));
                    break;
                case "d": // Delete
                    var before = (Map<String, Object>) change.get("before");
                    deleteFromReadModel("orders", (String) before.get("id"));
                    break;
            }

            // Emit domain events from CDC
            var after = (Map<String, Object>) change.get("after");
            String eventType = "d".equals(op) ? "OrderDeleted" : "OrderUpdated";
            producerService.sendEvent("order-events", (String) after.get("id"), Map.of(
                "eventType", eventType,
                "order", after,
                "timestamp", change.get("ts_ms")
            ));

            ack.acknowledge();
        } catch (Exception e) {
            log.error("Error processing CDC event", e);
            throw new RuntimeException(e);
        }
    }

    private void syncReadModel(String table, Map<String, Object> record) {
        jdbcTemplate.update(
            "INSERT INTO order_read_model (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?",
            record.get("id"), objectMapper.writeValueAsString(record),
            objectMapper.writeValueAsString(record));
    }

    private void deleteFromReadModel(String table, String id) {
        jdbcTemplate.update("DELETE FROM order_read_model WHERE id = ?", id);
    }
}
# Consume CDC events from Debezium and synchronize read model
from aiokafka import AIOKafkaConsumer
import json

class CdcConsumerService:
    def __init__(self, producer, db):
        self._producer = producer
        self._db = db

    async def run(self) -> None:
        consumer = AIOKafkaConsumer(
            "mysql-orders",
            bootstrap_servers="kafka-1:9092",
            group_id="cdc-sync",
        )
        await consumer.start()
        try:
            async for msg in consumer:
                await self._handle_cdc_event(json.loads(msg.value.decode()))
        finally:
            await consumer.stop()

    async def _handle_cdc_event(self, change: dict) -> None:
        op = change.get("op")

        match op:
            case "c" | "u":  # Create or Update
                await self._sync_read_model("orders", change["after"])
            case "d":  # Delete
                await self._delete_from_read_model("orders", change["before"]["id"])

        # Emit domain events from CDC
        after = change.get("after") or change.get("before", {})
        event_type = "OrderDeleted" if op == "d" else "OrderUpdated"
        await self._producer.send_event("order-events", after.get("id", ""), {
            "eventType": event_type,
            "order": after,
            "timestamp": change.get("ts_ms"),
        })

    async def _sync_read_model(self, table: str, record: dict) -> None:
        await self._db.execute(
            "INSERT INTO order_read_model (id, data) VALUES (:id, :data) "
            "ON CONFLICT (id) DO UPDATE SET data = :data",
            {"id": record["id"], "data": json.dumps(record)},
        )

    async def _delete_from_read_model(self, table: str, record_id: str) -> None:
        await self._db.execute(
            "DELETE FROM order_read_model WHERE id = :id", {"id": record_id}
        )
// Consume CDC events from Debezium and synchronize read model
public class CdcConsumerService : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly KafkaProducerService _producer;
    private readonly IDbConnection _db;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        _consumer.Subscribe("mysql-orders");

        while (!ct.IsCancellationRequested)
        {
            var result = _consumer.Consume(ct);
            var change = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(result.Message.Value)!;
            await HandleCdcEventAsync(change);
        }
    }

    private async Task HandleCdcEventAsync(Dictionary<string, JsonElement> change)
    {
        var op = change["op"].GetString();

        switch (op)
        {
            case "c": // Create
            case "u": // Update
                var after = change["after"].Deserialize<Dictionary<string, object>>()!;
                await SyncReadModelAsync("orders", after);
                break;
            case "d": // Delete
                var before = change["before"].Deserialize<Dictionary<string, object>>()!;
                await DeleteFromReadModelAsync("orders", before["id"].ToString()!);
                break;
        }

        // Emit domain events from CDC
        var record = op == "d"
            ? change["before"].Deserialize<Dictionary<string, object>>()!
            : change["after"].Deserialize<Dictionary<string, object>>()!;
        var eventType = op == "d" ? "OrderDeleted" : "OrderUpdated";
        await _producer.SendEventAsync("order-events", record["id"].ToString()!, new
        {
            eventType,
            order = record,
            timestamp = change.ContainsKey("ts_ms") ? change["ts_ms"].GetInt64() : 0,
        });
    }

    private async Task SyncReadModelAsync(string table, Dictionary<string, object> record)
    {
        await _db.ExecuteAsync(
            "INSERT INTO order_read_model (id, data) VALUES (@Id, @Data) ON CONFLICT (id) DO UPDATE SET data = @Data",
            new { Id = record["id"], Data = JsonSerializer.Serialize(record) });
    }

    private async Task DeleteFromReadModelAsync(string table, string id) =>
        await _db.ExecuteAsync("DELETE FROM order_read_model WHERE id = @Id", new { Id = id });
}

Benefits:

  • รักษา eventual consistency ข้ามระบบ
  • ศูนย์ผลกระทบบนฐานข้อมูลต้นฉบับ
  • ประวัติการเปลี่ยนแปลงที่สมบูรณ์
  • เปิดใจให้กับการซิงค์โครไนซ์ข้อมูลไปยังคลังข้อมูลวิเคราะห์

Monitoring: Consumer Lag, Throughput, และ Partition Health

Observability วิกฤตสำหรับระบบ Kafka ของผลิตภาพ:

import pino from 'pino';

const logger = pino();

class KafkaMonitoring {
  async monitorConsumerLag(
    groupId: string,
    topic: string,
    pollIntervalMs: number = 30000
  ): Promise<void> {
    setInterval(async () => {
      const admin = kafka.admin();
      await admin.connect();

      try {
        const offsets = await admin.fetchOffsets({ groupId });
        const topicMetadata = await admin.fetchTopicMetadata({ topics: [topic] });

        for (const partition of topicMetadata[0].partitions) {
          const consumerOffset = offsets
            .filter(
              (o) => o.topic === topic && o.partition === partition.partitionErrorCode
            )
            .map((o) => parseInt(o.offset))[0] || 0;

          const partitionLogSize = await this.getPartitionLogSize(
            admin,
            topic,
            partition.partitionErrorCode
          );

          const lag = partitionLogSize - consumerOffset;

          logger.info(
            {
              groupId,
              topic,
              partition: partition.partitionErrorCode,
              consumerOffset,
              logSize: partitionLogSize,
              lag,
            },
            'Consumer lag metric'
          );

          // Alert if lag exceeds threshold
          if (lag > 10000) {
            logger.warn(
              {
                groupId,
                topic,
                partition: partition.partitionErrorCode,
                lag,
              },
              'High consumer lag detected'
            );
          }
        }
      } finally {
        await admin.disconnect();
      }
    }, pollIntervalMs);
  }

  async monitorThroughput(topic: string): Promise<void> {
    let lastOffset = 0;
    let lastTime = Date.now();

    setInterval(async () => {
      const admin = kafka.admin();
      await admin.connect();

      try {
        const topicMetadata = await admin.fetchTopicMetadata({ topics: [topic] });
        let totalOffset = 0;

        for (const partition of topicMetadata[0].partitions) {
          const logSize = await this.getPartitionLogSize(
            admin,
            topic,
            partition.partitionErrorCode
          );
          totalOffset += logSize;
        }

        const currentTime = Date.now();
        const timeDiff = (currentTime - lastTime) / 1000; // seconds
        const offsetDiff = totalOffset - lastOffset;
        const throughput = offsetDiff / timeDiff;

        logger.info(
          {
            topic,
            totalOffset,
            throughput: `${Math.round(throughput)} events/sec`,
          },
          'Topic throughput'
        );

        lastOffset = totalOffset;
        lastTime = currentTime;
      } finally {
        await admin.disconnect();
      }
    }, 60000); // Every minute
  }

  private async getPartitionLogSize(
    admin: any,
    topic: string,
    partition: number
  ): Promise<number> {
    const offsets = await admin.fetchTopicOffsets(topic);
    const partitionOffsets = offsets.filter((o) => o.partition === partition);
    return partitionOffsets.length > 0 ? parseInt(partitionOffsets[0].high) : 0;
  }
}

// Use Prometheus for metrics export
import { register, Gauge, Counter } from 'prom-client';

const consumerLagGauge = new Gauge({
  name: 'kafka_consumer_lag',
  help: 'Consumer lag in number of messages',
  labelNames: ['group', 'topic', 'partition'],
  registers: [register],
});

const throughputCounter = new Counter({
  name: 'kafka_messages_processed_total',
  help: 'Total messages processed',
  labelNames: ['topic', 'partition'],
  registers: [register],
});
// Spring Boot with Micrometer + Prometheus
import org.apache.kafka.clients.admin.*;
import io.micrometer.core.instrument.*;
import org.springframework.scheduling.annotation.Scheduled;

@Service
public class KafkaMonitoringService {
    private final AdminClient adminClient;
    private final MeterRegistry meterRegistry;
    private static final Logger log = LoggerFactory.getLogger(KafkaMonitoringService.class);

    @Scheduled(fixedDelay = 30000)
    public void monitorConsumerLag() throws Exception {
        String groupId = "order-processors";
        String topic = "orders";

        ListConsumerGroupOffsetsResult offsetsResult = adminClient
            .listConsumerGroupOffsets(groupId);
        Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
            offsetsResult.partitionsToOffsetAndMetadata().get();

        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            adminClient.listOffsets(
                consumerOffsets.keySet().stream()
                    .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))
            ).all().get();

        for (var entry : consumerOffsets.entrySet()) {
            var tp = entry.getKey();
            long consumerOffset = entry.getValue().offset();
            long logSize = endOffsets.get(tp).offset();
            long lag = logSize - consumerOffset;

            log.info("Consumer lag — group: {}, topic: {}, partition: {}, lag: {}",
                groupId, tp.topic(), tp.partition(), lag);

            if (lag > 10000) {
                log.warn("High consumer lag detected — partition: {}, lag: {}",
                    tp.partition(), lag);
            }

            // Export to Prometheus via Micrometer
            Gauge.builder("kafka_consumer_lag", () -> (double) lag)
                .tag("group", groupId)
                .tag("topic", tp.topic())
                .tag("partition", String.valueOf(tp.partition()))
                .register(meterRegistry);
        }
    }

    @Scheduled(fixedDelay = 60000)
    public void monitorThroughput() throws Exception {
        // Compare end offsets over time to calculate throughput
        String topic = "orders";
        var partitions = adminClient.describeTopics(List.of(topic))
            .allTopicNames().get().get(topic).partitions();

        var tps = partitions.stream()
            .map(p -> new TopicPartition(topic, p.partition()))
            .collect(Collectors.toList());

        var offsets = adminClient.listOffsets(
            tps.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))
        ).all().get();

        long total = offsets.values().stream().mapToLong(r -> r.offset()).sum();
        log.info("Topic {} total offset: {}", topic, total);
    }
}
# pip install aiokafka prometheus-client
from aiokafka.admin import AIOKafkaAdminClient
from prometheus_client import Gauge, Counter, start_http_server
import asyncio
import logging

logger = logging.getLogger(__name__)

consumer_lag_gauge = Gauge(
    "kafka_consumer_lag",
    "Consumer lag in number of messages",
    ["group", "topic", "partition"],
)
throughput_counter = Counter(
    "kafka_messages_processed_total",
    "Total messages processed",
    ["topic", "partition"],
)

class KafkaMonitoringService:
    def __init__(self, brokers: list[str]):
        self.brokers = brokers

    async def monitor_consumer_lag(
        self, group_id: str, topic: str, poll_interval_s: int = 30
    ) -> None:
        while True:
            admin = AIOKafkaAdminClient(bootstrap_servers=",".join(self.brokers))
            await admin.start()
            try:
                consumer_offsets = await admin.list_consumer_group_offsets(group_id)
                end_offsets = await admin.list_offsets({
                    tp: OffsetSpec.LATEST for tp in consumer_offsets
                })

                for tp, offset_meta in consumer_offsets.items():
                    consumer_offset = offset_meta.offset
                    log_size = end_offsets[tp].offset
                    lag = log_size - consumer_offset

                    logger.info("Consumer lag — group: %s, topic: %s, partition: %d, lag: %d",
                        group_id, tp.topic, tp.partition, lag)

                    consumer_lag_gauge.labels(
                        group=group_id, topic=tp.topic, partition=str(tp.partition)
                    ).set(lag)

                    if lag > 10000:
                        logger.warning("High consumer lag detected — partition: %d, lag: %d",
                            tp.partition, lag)
            finally:
                await admin.close()
            await asyncio.sleep(poll_interval_s)

    async def monitor_throughput(self, topic: str) -> None:
        last_total = 0
        while True:
            admin = AIOKafkaAdminClient(bootstrap_servers=",".join(self.brokers))
            await admin.start()
            try:
                offsets = await admin.list_offsets({})
                total = sum(r.offset for r in offsets.values())
                throughput = (total - last_total) / 60
                logger.info("Topic %s throughput: %.1f events/sec", topic, throughput)
                last_total = total
            finally:
                await admin.close()
            await asyncio.sleep(60)
// NuGet: Confluent.Kafka, prometheus-net
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Prometheus;
using Microsoft.Extensions.Hosting;

public class KafkaMonitoringService : BackgroundService
{
    private readonly IAdminClient _adminClient;
    private readonly ILogger<KafkaMonitoringService> _logger;

    private static readonly Gauge ConsumerLagGauge = Metrics.CreateGauge(
        "kafka_consumer_lag",
        "Consumer lag in number of messages",
        new GaugeConfiguration { LabelNames = new[] { "group", "topic", "partition" } });

    private static readonly Counter ThroughputCounter = Metrics.CreateCounter(
        "kafka_messages_processed_total",
        "Total messages processed",
        new CounterConfiguration { LabelNames = new[] { "topic", "partition" } });

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            await MonitorConsumerLagAsync("order-processors", "orders");
            await Task.Delay(30_000, ct);
        }
    }

    private async Task MonitorConsumerLagAsync(string groupId, string topic)
    {
        try
        {
            var groupOffsets = await _adminClient.ListConsumerGroupOffsetsAsync(
                new[] { new ConsumerGroupTopicPartitions(groupId) });

            foreach (var tpo in groupOffsets.First().Partitions)
            {
                var endOffsets = await _adminClient.ListOffsetsAsync(
                    new[] { new TopicPartitionOffsetSpec(tpo.TopicPartition, OffsetSpec.Latest()) },
                    new ListOffsetsOptions());

                long consumerOffset = tpo.Offset.Value;
                long logSize = endOffsets.First().Offset.Value;
                long lag = logSize - consumerOffset;

                _logger.LogInformation(
                    "Consumer lag — group: {Group}, partition: {Partition}, lag: {Lag}",
                    groupId, tpo.Partition.Value, lag);

                ConsumerLagGauge
                    .WithLabels(groupId, topic, tpo.Partition.Value.ToString())
                    .Set(lag);

                if (lag > 10000)
                    _logger.LogWarning("High consumer lag on partition {P}: {Lag}",
                        tpo.Partition.Value, lag);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error monitoring consumer lag");
        }
    }
}

Key metrics:

  • Consumer lag: ระยะห่างระหว่าง latest offset และ committed offset
  • Throughput: เหตุการณ์ประมวลผลต่อวินาที
  • Rebalancing time: ระยะเวลา partition reassignment
  • Partition health: ของผู้นำ/ISR status การใช้ดิสก์
  • Broker metrics: CPU, memory, network I/O

Kafka กับ Message Queues Comparison

AspectKafkaRabbitMQAWS SQS
ModelAppend-only logMessage queueMessage queue
DurabilityPersistent by defaultConfigurablePersistent
Message retentionConfigurable durationUntil consumed14 days max
Replay capabilityNative (rewind offset)No (messages deleted)No
OrderingPer-partition guaranteePer-queue guaranteeBest effort
ThroughputMillions msg/secHundreds of thousandsHigh but eventually consistent
LatencyMillisecondsMillisecondsMilliseconds
ScalingPartitioningClusteringAuto-scaled
Consumer modelConsumer groupsCompeting consumersVisibility timeout
Multi-tenancyTopics + ACLsVirtual hostsSeparate queues
Use caseEvent streaming, analyticsTask distribution, pub/subDecoupled microservices

เลือก Kafka สำหรับ:

  • Event streaming และ sourcing
  • Analytics และ data warehousing
  • Multi-consumer replayed streams
  • High-throughput systems
  • Time-series data

เลือก message queues สำหรับ:

  • Simple request/response patterns
  • Task distribution และ job processing
  • Competing consumer model
  • Lower operational complexity

Production Deployment Checklist

Cluster Sizing

// Sizing formula considerations:
// 1. Throughput: Events per second to capacity
const requiredPartitions = Math.ceil(
  targetThroughput / MAX_PRODUCER_THROUGHPUT_PER_PARTITION
);
const requiredBrokers = Math.ceil(
  requiredPartitions / (PARTITIONS_PER_BROKER || 4000)
);

// 2. Storage: Retention period × daily volume
const dailyVolume = targetThroughput * 86400; // events/day
const dataSize = dailyVolume * AVERAGE_MESSAGE_SIZE; // bytes
const retentionDays = 30;
const requiredStorage = dataSize * retentionDays * REPLICATION_FACTOR;

// 3. Memory: OS page cache for hot partitions
// Rule of thumb: ~1GB per 100k partitions (working set)

// Example 3-broker cluster:
// - Throughput: 100k events/sec
// - Retention: 30 days
// - Replication: 3
// - Message size: 1KB average
// → 30 partitions, 100GB storage per broker, 64GB memory
// Sizing formula in Java
import java.math.BigDecimal;

public class KafkaSizingCalculator {

    // 1. Throughput → partition/broker count
    public static int requiredPartitions(long targetThroughput, long maxThroughputPerPartition) {
        return (int) Math.ceil((double) targetThroughput / maxThroughputPerPartition);
    }

    public static int requiredBrokers(int partitions, int partitionsPerBroker) {
        return (int) Math.ceil((double) partitions / partitionsPerBroker);
    }

    // 2. Storage: retention period × daily volume
    public static long requiredStorageBytes(
            long targetThroughput, long avgMessageSizeBytes,
            int retentionDays, int replicationFactor) {
        long dailyVolume = targetThroughput * 86_400L;
        long dataSize    = dailyVolume * avgMessageSizeBytes;
        return dataSize * retentionDays * replicationFactor;
    }

    // Example 3-broker cluster:
    // targetThroughput=100_000, partitionsPerBroker=4_000
    // → 25 partitions, 3 brokers, ~100GB storage each, 64GB memory
}
# Sizing formula in Python
import math

def required_partitions(target_throughput: int, max_throughput_per_partition: int) -> int:
    """1. Throughput → partition count."""
    return math.ceil(target_throughput / max_throughput_per_partition)

def required_brokers(partitions: int, partitions_per_broker: int = 4000) -> int:
    return math.ceil(partitions / partitions_per_broker)

def required_storage_bytes(
    target_throughput: int,
    avg_message_size_bytes: int,
    retention_days: int = 30,
    replication_factor: int = 3,
) -> int:
    """2. Storage: retention period × daily volume."""
    daily_volume = target_throughput * 86_400
    data_size    = daily_volume * avg_message_size_bytes
    return data_size * retention_days * replication_factor

# Example 3-broker cluster:
# target=100_000 events/sec, 1KB message, 30-day retention
# → ~30 partitions, 3 brokers, ~100GB storage per broker, 64GB memory
// Sizing formula in C#
public static class KafkaSizingCalculator
{
    // 1. Throughput → partition/broker count
    public static int RequiredPartitions(long targetThroughput, long maxThroughputPerPartition) =>
        (int)Math.Ceiling((double)targetThroughput / maxThroughputPerPartition);

    public static int RequiredBrokers(int partitions, int partitionsPerBroker = 4000) =>
        (int)Math.Ceiling((double)partitions / partitionsPerBroker);

    // 2. Storage: retention period × daily volume
    public static long RequiredStorageBytes(
        long targetThroughput,
        long avgMessageSizeBytes,
        int retentionDays = 30,
        int replicationFactor = 3)
    {
        long dailyVolume = targetThroughput * 86_400L;
        long dataSize    = dailyVolume * avgMessageSizeBytes;
        return dataSize * retentionDays * replicationFactor;
    }

    // Example 3-broker cluster:
    // targetThroughput=100_000, 1KB message, 30-day retention
    // → ~30 partitions, 3 brokers, ~100GB storage each, 64GB memory
}

Retention และ Cleanup Policies

// Topic configuration
const topicConfig = {
  name: 'orders',
  numPartitions: 30,
  replicationFactor: 3,
  configEntries: [
    // Retention by time
    { name: 'retention.ms', value: '2592000000' }, // 30 days
    // Retention by size
    { name: 'retention.bytes', value: '1099511627776' }, // 1TB
    // Log cleanup policy
    { name: 'cleanup.policy', value: 'delete' }, // or 'compact'
    // Log compaction ratio (if cleanup.policy=compact)
    { name: 'min.compaction.lag.ms', value: '86400000' }, // 1 day
    { name: 'delete.retention.ms', value: '86400000' },
    // Segment size (affects compaction frequency)
    { name: 'segment.ms', value: '86400000' }, // 1 day
    { name: 'segment.bytes', value: '1073741824' }, // 1GB
  ],
};
// Topic creation with retention policy via Spring Kafka / AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.kafka.core.KafkaAdmin;

@Bean
public NewTopic ordersTopic() {
    return TopicBuilder.name("orders")
        .partitions(30)
        .replicas(3)
        .config(TopicConfig.RETENTION_MS_CONFIG,       "2592000000")  // 30 days
        .config(TopicConfig.RETENTION_BYTES_CONFIG,    "1099511627776") // 1TB
        .config(TopicConfig.CLEANUP_POLICY_CONFIG,     TopicConfig.CLEANUP_POLICY_DELETE)
        .config(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "86400000")
        .config(TopicConfig.DELETE_RETENTION_MS_CONFIG,   "86400000")
        .config(TopicConfig.SEGMENT_MS_CONFIG,         "86400000")   // 1 day
        .config(TopicConfig.SEGMENT_BYTES_CONFIG,      "1073741824") // 1GB
        .build();
}
# Topic creation with retention policy via confluent_kafka AdminClient
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource, ConfigEntry

admin = AdminClient({"bootstrap.servers": "kafka-1:9092"})

new_topic = NewTopic(
    "orders",
    num_partitions=30,
    replication_factor=3,
    config={
        "retention.ms":          "2592000000",   # 30 days
        "retention.bytes":       "1099511627776", # 1TB
        "cleanup.policy":        "delete",        # or "compact"
        "min.compaction.lag.ms": "86400000",
        "delete.retention.ms":   "86400000",
        "segment.ms":            "86400000",      # 1 day
        "segment.bytes":         "1073741824",    # 1GB
    },
)
admin.create_topics([new_topic])
// Topic creation with retention policy via Confluent.Kafka AdminClient
using Confluent.Kafka;
using Confluent.Kafka.Admin;

using var adminClient = new AdminClientBuilder(
    new AdminClientConfig { BootstrapServers = "kafka-1:9092" }).Build();

await adminClient.CreateTopicsAsync(new[]
{
    new TopicSpecification
    {
        Name              = "orders",
        NumPartitions     = 30,
        ReplicationFactor = 3,
        Configs = new Dictionary<string, string>
        {
            ["retention.ms"]          = "2592000000",   // 30 days
            ["retention.bytes"]       = "1099511627776", // 1TB
            ["cleanup.policy"]        = "delete",
            ["min.compaction.lag.ms"] = "86400000",
            ["delete.retention.ms"]   = "86400000",
            ["segment.ms"]            = "86400000",     // 1 day
            ["segment.bytes"]         = "1073741824",   // 1GB
        },
    },
});

Compression และ Tuning

// Producer tuning for throughput
const producerConfig = {
  compression: CompressionTypes.Snappy, // or Gzip, LZ4, Zstd
  linger: 10, // Wait 10ms to batch messages
  batchSize: 16384, // 16KB per batch
  maxInFlightRequests: 5, // Pipeline requests
  acks: 'all', // Full durability
  requestTimeout: 30000,
  retries: 5,
};

// Broker tuning
const brokerConfig = {
  'num.network.threads': 8, // Network threads
  'num.io.threads': 8, // I/O threads
  'socket.send.buffer.bytes': 102400, // 100KB
  'socket.receive.buffer.bytes': 102400,
  'socket.request.max.bytes': 104857600, // 100MB
  'log.flush.interval.messages': 10000,
  'log.flush.interval.ms': 1000,
};
// Spring Kafka: producer tuning for throughput
@Bean
public ProducerFactory<String, String> tunedProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,              "snappy");
    config.put(ProducerConfig.LINGER_MS_CONFIG,                     10);        // batch window
    config.put(ProducerConfig.BATCH_SIZE_CONFIG,                    16384);     // 16KB
    config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
    config.put(ProducerConfig.ACKS_CONFIG,                          "all");
    config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,            30000);
    config.put(ProducerConfig.RETRIES_CONFIG,                       5);
    return new DefaultKafkaProducerFactory<>(config);
}

// Broker tuning is set in server.properties (not in Java code):
// num.network.threads=8
// num.io.threads=8
// socket.send.buffer.bytes=102400
// socket.receive.buffer.bytes=102400
// socket.request.max.bytes=104857600
// log.flush.interval.messages=10000
// log.flush.interval.ms=1000
# aiokafka: producer tuning for throughput
from aiokafka import AIOKafkaProducer

producer = AIOKafkaProducer(
    bootstrap_servers="kafka-1:9092",
    compression_type="snappy",          # or gzip, lz4, zstd
    linger_ms=10,                       # wait 10ms to batch
    max_batch_size=16384,               # 16KB per batch
    max_in_flight_requests_per_connection=5,
    acks="all",                         # full durability
    request_timeout_ms=30000,
    retry_backoff_ms=100,
)

# Broker tuning is set in server.properties (not in Python code):
# num.network.threads=8
# num.io.threads=8
# socket.send.buffer.bytes=102400
# socket.receive.buffer.bytes=102400
# socket.request.max.bytes=104857600
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
// Confluent.Kafka: producer tuning for throughput
var producerConfig = new ProducerConfig
{
    CompressionType      = CompressionType.Snappy, // or Gzip, Lz4, Zstd
    LingerMs             = 10,        // wait 10ms to batch
    BatchSize            = 16384,     // 16KB per batch
    MaxInFlight          = 5,
    Acks                 = Acks.All,  // full durability
    MessageTimeoutMs     = 30000,
    // Retries handled via EnableIdempotence or retry policy
};

// Broker tuning is set in server.properties (not in C# code):
// num.network.threads=8
// num.io.threads=8
// socket.send.buffer.bytes=102400
// socket.receive.buffer.bytes=102400
// socket.request.max.bytes=104857600
// log.flush.interval.messages=10000
// log.flush.interval.ms=1000

Monitoring และ Alerting

// Critical alerts
const alerts = [
  {
    name: 'HighConsumerLag',
    condition: 'consumer_lag > 100000',
    severity: 'critical',
    action: 'Page on-call engineer',
  },
  {
    name: 'BrokerDown',
    condition: 'broker_up == 0',
    severity: 'critical',
    action: 'Page on-call engineer',
  },
  {
    name: 'ISRShrinking',
    condition: 'isr_size < replication_factor',
    severity: 'warning',
    action: 'Check broker health',
  },
  {
    name: 'ProducerErrors',
    condition: 'producer_errors_total > 10',
    severity: 'warning',
    action: 'Investigate producer failures',
  },
  {
    name: 'HighDiskUsage',
    condition: 'disk_usage_percent > 80',
    severity: 'warning',
    action: 'Scale cluster or increase retention',
  },
];
// Alert definitions as Java records — wire into your alerting system
public record AlertDefinition(
    String name, String condition, String severity, String action) {}

public class KafkaAlertDefinitions {
    public static final List<AlertDefinition> ALERTS = List.of(
        new AlertDefinition("HighConsumerLag",
            "consumer_lag > 100000", "critical", "Page on-call engineer"),
        new AlertDefinition("BrokerDown",
            "broker_up == 0", "critical", "Page on-call engineer"),
        new AlertDefinition("ISRShrinking",
            "isr_size < replication_factor", "warning", "Check broker health"),
        new AlertDefinition("ProducerErrors",
            "producer_errors_total > 10", "warning", "Investigate producer failures"),
        new AlertDefinition("HighDiskUsage",
            "disk_usage_percent > 80", "warning", "Scale cluster or increase retention")
    );
}
# Alert definitions — wire into your alerting system (Prometheus AlertManager, PagerDuty, etc.)
from dataclasses import dataclass

@dataclass
class AlertDefinition:
    name: str
    condition: str
    severity: str
    action: str

KAFKA_ALERTS = [
    AlertDefinition("HighConsumerLag",
        "consumer_lag > 100000", "critical", "Page on-call engineer"),
    AlertDefinition("BrokerDown",
        "broker_up == 0", "critical", "Page on-call engineer"),
    AlertDefinition("ISRShrinking",
        "isr_size < replication_factor", "warning", "Check broker health"),
    AlertDefinition("ProducerErrors",
        "producer_errors_total > 10", "warning", "Investigate producer failures"),
    AlertDefinition("HighDiskUsage",
        "disk_usage_percent > 80", "warning", "Scale cluster or increase retention"),
]
// Alert definitions — wire into your alerting system (Prometheus AlertManager, PagerDuty, etc.)
public record AlertDefinition(
    string Name, string Condition, string Severity, string Action);

public static class KafkaAlertDefinitions
{
    public static readonly IReadOnlyList<AlertDefinition> Alerts =
    [
        new("HighConsumerLag",
            "consumer_lag > 100000", "critical", "Page on-call engineer"),
        new("BrokerDown",
            "broker_up == 0", "critical", "Page on-call engineer"),
        new("ISRShrinking",
            "isr_size < replication_factor", "warning", "Check broker health"),
        new("ProducerErrors",
            "producer_errors_total > 10", "warning", "Investigate producer failures"),
        new("HighDiskUsage",
            "disk_usage_percent > 80", "warning", "Scale cluster or increase retention"),
    ];
}

Security

// TLS/SSL configuration
const kafkaConfig = {
  ssl: {
    rejectUnauthorized: false, // Verify certificates
    ca: [readFileSync('/path/to/ca-cert.pem', 'utf-8')],
    cert: readFileSync('/path/to/client-cert.pem', 'utf-8'),
    key: readFileSync('/path/to/client-key.pem', 'utf-8'),
  },
  sasl: {
    mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
    username: process.env.KAFKA_USER,
    password: process.env.KAFKA_PASSWORD,
  },
};

// ACL setup (broker-side)
const acls = [
  {
    resourceType: 'TOPIC',
    resourceName: 'orders',
    principal: 'User:order-service',
    operation: 'WRITE',
    permissionType: 'ALLOW',
  },
  {
    resourceType: 'TOPIC',
    resourceName: 'orders',
    principal: 'User:order-processor',
    operation: 'READ',
    permissionType: 'ALLOW',
  },
];
// Spring Kafka: TLS/SSL + SASL configuration
@Bean
public ProducerFactory<String, String> secureProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,  "SASL_SSL");
    config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,     "/path/to/truststore.jks");
    config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,     System.getenv("TRUSTSTORE_PASS"));
    config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,       "/path/to/keystore.jks");
    config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,       System.getenv("KEYSTORE_PASS"));
    config.put(SaslConfigs.SASL_MECHANISM,                    "PLAIN");
    config.put(SaslConfigs.SASL_JAAS_CONFIG,
        "org.apache.kafka.common.security.plain.PlainLoginModule required " +
        "username=\"" + System.getenv("KAFKA_USER") + "\" " +
        "password=\"" + System.getenv("KAFKA_PASSWORD") + "\";");
    return new DefaultKafkaProducerFactory<>(config);
}

// ACL creation via AdminClient
public void createAcls(AdminClient adminClient) throws Exception {
    var writeAcl = new AclBinding(
        new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
        new AccessControlEntry("User:order-service", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
    var readAcl = new AclBinding(
        new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
        new AccessControlEntry("User:order-processor", "*", AclOperation.READ, AclPermissionType.ALLOW));
    adminClient.createAcls(List.of(writeAcl, readAcl)).all().get();
}
# aiokafka: TLS/SSL + SASL configuration
import ssl
import os
from aiokafka import AIOKafkaProducer

ssl_context = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
ssl_context.load_cert_chain(
    certfile="/path/to/client-cert.pem",
    keyfile="/path/to/client-key.pem",
)

producer = AIOKafkaProducer(
    bootstrap_servers="kafka-1:9092",
    security_protocol="SASL_SSL",
    ssl_context=ssl_context,
    sasl_mechanism="PLAIN",          # or SCRAM-SHA-256, SCRAM-SHA-512
    sasl_plain_username=os.getenv("KAFKA_USER", ""),
    sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
)

# ACL creation via confluent_kafka AdminClient
from confluent_kafka.admin import AdminClient, AclBinding, AclOperation, AclPermissionType, ResourceType, ResourcePatternType

admin = AdminClient({"bootstrap.servers": "kafka-1:9092"})
acls = [
    AclBinding(ResourceType.TOPIC, "orders", ResourcePatternType.LITERAL,
               "User:order-service",   "*", AclOperation.WRITE, AclPermissionType.ALLOW),
    AclBinding(ResourceType.TOPIC, "orders", ResourcePatternType.LITERAL,
               "User:order-processor", "*", AclOperation.READ,  AclPermissionType.ALLOW),
]
admin.create_acls(acls)
// Confluent.Kafka: TLS/SSL + SASL configuration
var secureProducerConfig = new ProducerConfig
{
    BootstrapServers   = "kafka-1:9092",
    SecurityProtocol   = SecurityProtocol.SaslSsl,
    SslCaLocation      = "/path/to/ca-cert.pem",
    SslCertificateLocation = "/path/to/client-cert.pem",
    SslKeyLocation     = "/path/to/client-key.pem",
    SaslMechanism      = SaslMechanism.Plain, // or ScramSha256, ScramSha512
    SaslUsername       = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
    SaslPassword       = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
};

// ACL creation via AdminClient
using var adminClient = new AdminClientBuilder(
    new AdminClientConfig { BootstrapServers = "kafka-1:9092" }).Build();

await adminClient.CreateAclsAsync(new[]
{
    new AclBinding(
        new ResourceSpecification(ResourceType.Topic, "orders", ResourcePatternType.Literal),
        new AccessControlEntry("User:order-service",   "*", AclOperation.Write, AclPermissionType.Allow)),
    new AclBinding(
        new ResourceSpecification(ResourceType.Topic, "orders", ResourcePatternType.Literal),
        new AccessControlEntry("User:order-processor", "*", AclOperation.Read,  AclPermissionType.Allow)),
});

Operations และ Maintenance

Regular tasks:

  • Monitor consumer lag และ throughput
  • Check broker และ ISR health
  • Review log compaction progress
  • Test disaster recovery (broker failure scenarios)
  • Rotate credentials periodically
  • Update Kafka version safely (rolling upgrade)
  • Archive old data to cold storage
  • Plan capacity for growth

Incident response:

  • Consumer lag spike: Check consumer health เพิ่ม parallelism
  • Message loss: Investigate producer acks และ replication
  • High latency: Check broker CPU/memory, network I/O
  • Rebalancing storms: Increase session timeout fix consumer crashes
  • Disk full: Scale cluster หรือ increase retention window

Conclusion

Kafka ได้เปลี่ยนวิธีการของศักยภาพสร้างระบบที่ปรับขนาดได้และขับเคลื่อนด้วยเหตุการณ์ โมเดล append-only log เปิดใจให้กับรูปแบบที่เป็นไปไม่ได้กับ message queues แบบดั้งเดิม — จากการ event sourcing ถึง CQRS ถึง streaming analytics

Key takeaways:

  1. Think in events: Event streams เป็น abstraction ส่วน time-series data
  2. Plan for scale: Design partition strategy และ replication upfront
  3. Embrace idempotency: Handle at-least-once semantics ด้วย idempotent processing
  4. Instrument thoroughly: Consumer lag และ broker health เป็น essential visibility
  5. Practice operations: Test failure scenarios และ disaster recovery regularly
  6. Schema evolve safely: Use Schema Registry สำหรับ schema versioning
  7. Measure และ optimize: Profile producers และ consumers สำหรับ bottlenecks

การลงทุนในความเข้าใจ Kafka จ่ายเงินปันผลเมื่อสร้างระบบที่ต้องประมวลผลเหตุการณ์นับล้านอย่างน่าเชื่อถือและปรับขนาดเพื่อจัดการการเติบโต


กลับไปยัง Event Streaming Technologies

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

เขียนโดย พลากร วรมงคล

Software Engineer Specialist ประสบการณ์กว่า 20 ปี เขียนเกี่ยวกับ Architecture, Performance และการสร้างระบบ Production

เพิ่มเติมเกี่ยวกับผม

บทความที่เกี่ยวข้อง