บทวิเคราะห์เชิงลึก: Event Streaming กับ Kafka
Event streaming ได้กลายเป็นกระดูกสันหลังของระบบที่กระจายแล้วในสมัยใหม่ Apache Kafka โดดเด่นเป็นมาตรฐานของอุตสาหกรรมสำหรับการสร้างไปป์ไลน์ข้อมูลแบบเรียลไทม์และการใช้งาน streaming ที่มีขนาดใหญ่ คำแนะนำนี้สำรวจรากฐานของสถาปัตยกรรม รูปแบบการนำไปใช้ และการปฏิบัติในสภาวะการผลิตที่ทำให้ Kafka เป็นสิ่งที่ขาดไม่ได้สำหรับสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์
Event Streaming คืออะไร
Event streaming คือการปฏิบัติของการจับข้อมูลในการเคลื่อนที่ การประมวลผล และการตอบสนองต่อข้อมูล — การปฏิบัติต่อเหตุการณ์ว่าเป็นแหล่งความจริงในระบบของคุณ ซึ่งแตกต่างจากคิวข้อความแบบดั้งเดิมที่กินและทิ้งข้อความ ระบบ event streaming รักษาบันทึกที่ไม่เปลี่ยนแปลงและเพิ่มเติมเท่านั้นของเหตุการณ์ที่แอปพลิเคชันสามารถประมวลผลอย่างอิสระและเล่นซ้ำได้ตลอดเวลา
โมเดล Append-Only Log
ที่เป็นหัวใจหลัก Kafka เป็นบันทึก commit แบบกระจาย partitioned และ replicated ลองนึกถึงมันว่าเป็นโครงสร้างข้อมูลแบบเพิ่มเติมเท่านั้นที่เหตุการณ์ถูกเขียนตามลำดับและไม่เคยถูกแก้ไข การออกแบบพื้นฐานนี้เปิดใจให้กับความสามารถที่ทรงพลังหลายประการ:
- ความสามารถในการเล่นซ้ำ: ย้อนกลับและประมวลผลเหตุการณ์ทางประวัติศาสตร์อีกครั้งจากจุดใดๆ ในเวลา
- ตราประวัติ: บันทึกที่สมบูรณ์ของการเปลี่ยนแปลงสถานะเพื่อการปฏิบัติตามและการแก้จุดบกพร่อง
- Decoupling: ผู้ผลิตและผู้บริโภคทำงานอย่างอิสระโดยไม่ต้องประสานงาน
- ความสามารถในการปรับขนาด: ผู้บริโภคหลายคนประมวลผลเหตุการณ์เดียวกันโดยไม่มีความขัดแย้ง
Topics, Partitions, และ Consumer Groups
topic ใน Kafka คือการป้อนข้อมูลที่ตั้งชื่อของเหตุการณ์ที่จัดเป็นลำดับที่เรียงลำดับ Topics ถูก partitioned ไปยัง brokers หลายตัวเพื่อ parallelism — แต่ละ partition คือบันทึกแบบเพิ่มเติมเท่านั้นอิสระ เหตุการณ์ได้รับการกำหนด partitions โดยใช้กลยุทธ์การแบ่ง (โดยทั่วไปแฮชของกุญแจ) เพื่อรับประกันการสั่งซื้อภายใน partition
consumer groups เป็นชุดของผู้บริโภคที่ทำงานร่วมกันเพื่อประมวลผล topic Kafka กำหนด partitions ให้ผู้บริโภคภายในกลุ่ม ซึ่งมั่นใจว่า partition แต่ละตัวถูกอ่านโดยผู้บริโภคคนเดียวในกลุ่ม consumer groups ที่แตกต่างกันสามารถประมวลผล topic เดียวกันได้อย่างอิสระที่ offsets ที่แตกต่างกัน
การจัดการ Offset และการเล่นซ้ำ
ผู้บริโภคแต่ละคนรักษาไว้ offset — ตำแหน่งใน partition’s log ที่แสดงถึงข้อความถัดไปที่จะอ่าน โดยการจัดเก็บ offsets ผู้บริโภคสามารถทำงานต่อจากที่พวกเขาหยุดและประมวลผลเหตุการณ์อีกครั้งตามความจำเป็น สิ่งนี้ช่วยให้สามารถเล่นซ้ำจากจุดเฉพาะสำหรับการวิเคราะห์ backfilling บริการใหม่ หรือการกู้คืนจากความล้มเหลวในการประมวลผล
ความแตกต่างจาก Traditional Message Queues
Traditional message queues (RabbitMQ, ActiveMQ) ลบข้อความหลังจากการส่ง Kafka เก็บข้อความสำหรับระยะเวลาที่กำหนดได้หรือขีดจำกัดขนาด ซึ่งช่วยให้ผู้บริโภคหลายคนประมวลผลสตรีมข้อความเดียวกันได้อย่างอิสระด้วยอัตราที่แตกต่างกัน โมเดลการเก็บรักษานี้เปลี่ยนวิธีที่คุณสร้างระบบโดยพื้นฐาน — เปิดใจให้กับรูปแบบใหม่เช่น event sourcing และ CQRS
sequenceDiagram
participant Producer
participant Kafka Topic/Partition
participant Consumer Group A
participant Consumer Group B
Producer->>Kafka Topic/Partition: Publish Event 1
Producer->>Kafka Topic/Partition: Publish Event 2
Producer->>Kafka Topic/Partition: Publish Event 3
Kafka Topic/Partition->>Consumer Group A: Read at offset 0
Consumer Group A->>Consumer Group A: Process Event 1
Kafka Topic/Partition->>Consumer Group B: Read at offset 0
Consumer Group B->>Consumer Group B: Process Event 1
Kafka Topic/Partition->>Consumer Group A: Read at offset 1
Consumer Group A->>Consumer Group A: Process Event 2
Kafka Topic/Partition->>Consumer Group B: Read at offset 1
Consumer Group B->>Consumer Group B: Process Event 2
Note over Consumer Group A,Consumer Group B: Both groups independently<br/>process the same events
หลักการหลัก
- Immutability: เหตุการณ์ถูกเขียนครั้งเดียวและไม่เคยถูกแก้ไข โดยสร้างบันทึกที่เชื่อถือได้
- Ordering: ภายใน partition เหตุการณ์รักษาการสั่งซื้อที่เข้มงวด การสั่งซื้อโดยรวมต้องใช้ partition เดียว
- Durability: replication ที่กำหนดได้มั่นใจความทนต่อความล้มเหลวและความคงอยู่ของข้อมูล
- Scalability: การแบ่ง partitions ช่วยให้สามารถปรับขนาดแบบเส้นตรงของเสิร์จและประมวลผลความสามารถ
- Decoupling: ผู้ผลิตและผู้บริโภคเป็นอิสระ ความล้มเหลวในตัวเดียวไม่ส่งผลกระทบโดยตรงต่ออีกตัวหนึ่ง
- Retention: ข้อความยังคงอยู่สำหรับระยะเวลาการเก็บรักษา ซึ่งเปิดใจให้กับการเล่นซ้ำและการ onboarding ผู้บริโภคใหม่
- Replay: ย้อนกลับ consumer offset เพื่อประมวลผลเหตุการณ์อีกครั้ง เปิดใจให้กับการแก้ไขและการวิเคราะห์
Kafka Architecture
Brokers และคลัสเตอร์
คลัสเตอร์ Kafka ประกอบด้วย brokers (เซิร์ฟเวอร์) หลายตัวที่ทำงานร่วมกันเพื่อจัดเก็บและให้บริการเหตุการณ์ แต่ละ broker มี partition ส่วนหนึ่งและให้บริการคำขอจากผู้ผลิตและผู้บริโภค broker cluster จะเลือก controller broker โดยอัตโนมัติซึ่งประสานงานงานการบริหารจัดการเช่น partition assignment และ leader election
Topics และ Partitions
topic คือการจัดกลุ่มเหตุการณ์ทางตรรมชาติที่มีชื่อและการกำหนดค่า (replication factor, retention policy, compression) Topics ถูกแบ่งออกเป็น partitions หลายตัว — แต่ละ partition ถูก replicated ไปยัง brokers หลายตัวเพื่อความทนต่อความล้มเหลว replication factor (โดยทั่วไป 3) หมายความว่า partition แต่ละตัวมี leader replica และ follower replicas
Replication และ In-Sync Replicas (ISR)
ISR (In-Sync Replicas) set มี leader และ followers ทั้งหมดที่เพิ่มทันท่วงกับ leader เมื่อคุณเขียนไป partition ด้วย acks=all broker รอการยอมรับจาก ISRs ทั้งหมด ซึ่งรับประกัน durability ถ้า broker ล้มเหลว ISR follower ตัวหนึ่งกลายเป็น leader ใหม่โดยอัตโนมัติ
กลยุทธ์การ replicate นี้ให้:
- ความทนต่อความล้มเหลว: การประมาณความล้มเหลวของ broker โดยไม่สูญเสียข้อมูล
- ความพร้อมใช้งาน: การล้มเหลวแบบอัตโนมัติเมื่อ brokers ลง
- ความคงอยู่ของข้อมูลปรับแต่ง: การแลกเปลี่ยนความล่าช้าสำหรับความสอดคล้องกับการกำหนดค่า
acks
ZooKeeper/KRaft Mode
Kafka สมัยใหม่ (3.3+) ใช้โหมด KRaft (Kafka Raft) ซึ่งกำจัด ZooKeeper dependency ภายนอก KRaft รวมข้อมูล cluster metadata เข้าใน Kafka เอง ซึ่งลดความซับซ้อนของการดำเนินการและปรับปรุงความเชื่อถือได้
Producer Implementation กับ KafkaJS
Producer ส่งเหตุการณ์ไปยัง Kafka topics นี่คือการนำไปใช้ที่มีคุณภาพผลิตในภาษา TypeScript:
import { Kafka, logLevel, CompressionTypes } from 'kafkajs';
interface ProducerConfig {
brokers: string[];
clientId: string;
idempotent?: boolean;
maxInFlightRequests?: number;
compression?: CompressionTypes;
}
class KafkaProducer {
private kafka: Kafka;
private producer: any;
constructor(config: ProducerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
logLevel: logLevel.INFO,
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USER || '',
password: process.env.KAFKA_PASSWORD || '',
},
});
this.producer = this.kafka.producer({
idempotent: config.idempotent ?? true,
maxInFlightRequests: config.maxInFlightRequests ?? 5,
compression: config.compression ?? CompressionTypes.Snappy,
// Tuning for high throughput
requestTimeout: 30000,
retry: {
initialRetryTime: 100,
retries: 8,
randomizationFactor: 0.2,
},
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
// Single event with key-based partitioning
async sendEvent(topic: string, key: string, value: any): Promise<void> {
await this.producer.send({
topic,
messages: [
{
key,
value: JSON.stringify(value),
timestamp: Date.now().toString(),
},
],
});
}
// Batch send for higher throughput
async sendBatch(topic: string, events: Array<{ key: string; value: any }>): Promise<void> {
await this.producer.send({
topic,
messages: events.map(({ key, value }) => ({
key,
value: JSON.stringify(value),
timestamp: Date.now().toString(),
})),
// Batch settings for throughput
timeout: 30000,
compression: CompressionTypes.Snappy,
});
}
// Transactional writes (exactly-once)
async sendTransactional(
topic: string,
events: Array<{ key: string; value: any }>
): Promise<void> {
const transaction = await this.producer.transaction();
try {
await transaction.send({
topic,
messages: events.map(({ key, value }) => ({
key,
value: JSON.stringify(value),
})),
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
}
// Usage example
const producer = new KafkaProducer({
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
clientId: 'order-service',
idempotent: true,
compression: CompressionTypes.Snappy,
});
await producer.connect();
// Send order event
await producer.sendEvent('orders', 'order-123', {
orderId: 'order-123',
customerId: 'cust-456',
amount: 99.99,
timestamp: new Date().toISOString(),
});
await producer.disconnect();// pom.xml dependency: org.springframework.kafka:spring-kafka
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.SendResult;
import java.util.*;
import java.util.concurrent.CompletableFuture;
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// Single event with key-based partitioning
public CompletableFuture<SendResult<String, String>> sendEvent(
String topic, String key, Object value) throws Exception {
String json = objectMapper.writeValueAsString(value);
return kafkaTemplate.send(topic, key, json);
}
// Batch send for higher throughput
public void sendBatch(String topic, List<Map<String, Object>> events) throws Exception {
for (Map<String, Object> event : events) {
String key = (String) event.get("key");
String json = objectMapper.writeValueAsString(event.get("value"));
kafkaTemplate.send(topic, key, json);
}
kafkaTemplate.flush();
}
// Transactional writes (exactly-once)
public void sendTransactional(String topic, List<Map<String, Object>> events) throws Exception {
kafkaTemplate.executeInTransaction(kt -> {
for (Map<String, Object> event : events) {
try {
String key = (String) event.get("key");
String json = objectMapper.writeValueAsString(event.get("value"));
kt.send(topic, key, json);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return true;
});
}
}
// Spring Boot configuration
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
config.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
config.put(ProducerConfig.RETRIES_CONFIG, 8);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}# pip install confluent-kafka aiokafka
import asyncio
import json
import os
from dataclasses import dataclass
from typing import Any
from aiokafka import AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
@dataclass
class ProducerConfig:
brokers: list[str]
client_id: str
idempotent: bool = True
compression: str = "snappy"
class KafkaProducerService:
def __init__(self, config: ProducerConfig):
self.config = config
self.producer: AIOKafkaProducer | None = None
async def connect(self) -> None:
ssl_context = create_ssl_context()
self.producer = AIOKafkaProducer(
bootstrap_servers=",".join(self.config.brokers),
client_id=self.config.client_id,
enable_idempotence=self.config.idempotent,
compression_type=self.config.compression,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
retry_backoff_ms=100,
ssl_context=ssl_context,
sasl_mechanism="PLAIN",
sasl_plain_username=os.getenv("KAFKA_USER", ""),
sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
)
await self.producer.start()
async def disconnect(self) -> None:
if self.producer:
await self.producer.stop()
# Single event with key-based partitioning
async def send_event(self, topic: str, key: str, value: Any) -> None:
await self.producer.send(
topic,
key=key.encode(),
value=json.dumps(value).encode(),
timestamp_ms=int(asyncio.get_event_loop().time() * 1000),
)
# Batch send for higher throughput
async def send_batch(self, topic: str, events: list[dict]) -> None:
batch = self.producer.create_batch()
for event in events:
batch.append(
key=event["key"].encode(),
value=json.dumps(event["value"]).encode(),
timestamp=None,
)
partitions = await self.producer.partitions_for(topic)
partition = next(iter(partitions))
await self.producer.send_batch(batch, topic, partition=partition)
# Transactional writes (exactly-once)
async def send_transactional(self, topic: str, events: list[dict]) -> None:
async with self.producer.transaction():
for event in events:
await self.producer.send(
topic,
key=event["key"].encode(),
value=json.dumps(event["value"]).encode(),
)
# Usage example
async def main():
producer = KafkaProducerService(ProducerConfig(
brokers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
client_id="order-service",
idempotent=True,
compression="snappy",
))
await producer.connect()
await producer.send_event("orders", "order-123", {
"orderId": "order-123",
"customerId": "cust-456",
"amount": 99.99,
"timestamp": "2024-01-01T00:00:00Z",
})
await producer.disconnect()// NuGet: Confluent.Kafka
using Confluent.Kafka;
using System.Text.Json;
public class ProducerConfig
{
public string[] Brokers { get; set; } = [];
public string ClientId { get; set; } = "";
public bool Idempotent { get; set; } = true;
public CompressionType Compression { get; set; } = CompressionType.Snappy;
}
public class KafkaProducerService : IDisposable
{
private readonly IProducer<string, string> _producer;
public KafkaProducerService(ProducerConfig config)
{
var producerConfig = new Confluent.Kafka.ProducerConfig
{
BootstrapServers = string.Join(",", config.Brokers),
ClientId = config.ClientId,
EnableIdempotence = config.Idempotent,
CompressionType = config.Compression,
MaxInFlight = 5,
MessageTimeoutMs = 30000,
Acks = Acks.All,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
SaslPassword = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
SecurityProtocol = SecurityProtocol.SaslSsl,
};
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
}
// Single event with key-based partitioning
public async Task SendEventAsync(string topic, string key, object value)
{
var json = JsonSerializer.Serialize(value);
await _producer.ProduceAsync(topic, new Message<string, string>
{
Key = key,
Value = json,
Timestamp = new Timestamp(DateTime.UtcNow),
});
}
// Batch send for higher throughput
public async Task SendBatchAsync(string topic, IEnumerable<(string Key, object Value)> events)
{
foreach (var (key, value) in events)
{
var json = JsonSerializer.Serialize(value);
_producer.Produce(topic, new Message<string, string> { Key = key, Value = json });
}
_producer.Flush(TimeSpan.FromSeconds(30));
await Task.CompletedTask;
}
// Transactional writes (exactly-once)
public async Task SendTransactionalAsync(string topic, IEnumerable<(string Key, object Value)> events)
{
_producer.InitTransactions(TimeSpan.FromSeconds(30));
_producer.BeginTransaction();
try
{
foreach (var (key, value) in events)
{
var json = JsonSerializer.Serialize(value);
_producer.Produce(topic, new Message<string, string> { Key = key, Value = json });
}
_producer.CommitTransaction();
}
catch
{
_producer.AbortTransaction();
throw;
}
await Task.CompletedTask;
}
public void Dispose() => _producer.Dispose();
}กลยุทธ์ Partitioning
partition key กำหนดว่า partition ใดได้รับเหตุการณ์:
// Strategy 1: Hash-based (default) — key is hashed to partition
// Same key always goes to same partition (ordered within key)
await producer.sendEvent('orders', 'customer-123', orderData);
// Strategy 2: Round-robin (null key)
// Distributes events evenly across partitions, but no ordering
await producer.sendEvent('events', null, eventData);
// Strategy 3: Custom strategy — use business domain
// Use customer ID to co-locate related events
const customerId = order.customerId;
await producer.sendEvent('orders', customerId, orderData);// Strategy 1: Hash-based — same key always goes to same partition
producerService.sendEvent("orders", "customer-123", orderData).get();
// Strategy 2: Round-robin (null key)
producerService.sendEvent("events", null, eventData).get();
// Strategy 3: Custom strategy — use business domain
String customerId = order.getCustomerId();
producerService.sendEvent("orders", customerId, orderData).get();# Strategy 1: Hash-based — same key always goes to same partition
await producer.send_event("orders", "customer-123", order_data)
# Strategy 2: Round-robin (null key) — distribute evenly
await producer.producer.send("events", value=json.dumps(event_data).encode())
# Strategy 3: Custom strategy — use business domain
customer_id = order["customerId"]
await producer.send_event("orders", customer_id, order_data)// Strategy 1: Hash-based — same key always goes to same partition
await producerService.SendEventAsync("orders", "customer-123", orderData);
// Strategy 2: Round-robin (null key)
await producerService.SendEventAsync("events", null!, eventData);
// Strategy 3: Custom strategy — use business domain
var customerId = order.CustomerId;
await producerService.SendEventAsync("orders", customerId, orderData);พิจารณาการออกแบบหลัก:
- Hot partitions: หลีกเลี่ยง keys ที่เข้มข้นการจราจร (การจราจรทั้งหมดไปยัง partition เดียว)
- ความต้องการการสั่งซื้อ: ใช้ keys เพื่อรับประกันการสั่งซื้อของเหตุการณ์ที่เกี่ยวข้อง
- Distribution: มั่นใจว่า keys กระจายอย่างเท่าเทียมกันไปยัง partitions เพื่อการ load balancing
Batching และ Compression
การจัดกลุ่มเหตุการณ์หลายตัวก่อนส่งเพิ่มเสิร์จอย่างมีนัยสำคัญ:
// Batch events together
const events: Array<{ key: string; value: any }> = [];
for (let i = 0; i < 1000; i++) {
events.push({
key: `customer-${i % 100}`,
value: { eventId: i, timestamp: Date.now() },
});
}
// Send batch with compression
await producer.sendBatch('events', events);List<Map<String, Object>> events = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
events.add(Map.of(
"key", "customer-" + (i % 100),
"value", Map.of("eventId", i, "timestamp", System.currentTimeMillis())
));
}
// Send batch with compression
producerService.sendBatch("events", events);events = [
{
"key": f"customer-{i % 100}",
"value": {"eventId": i, "timestamp": int(asyncio.get_event_loop().time() * 1000)},
}
for i in range(1000)
]
# Send batch with compression
await producer.send_batch("events", events)var events = Enumerable.Range(0, 1000).Select(i => (
Key: $"customer-{i % 100}",
Value: (object)new { eventId = i, timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
));
// Send batch with compression
await producerService.SendBatchAsync("events", events);Compression ลดความกว้างของแบนด์วิดท์เครือข่ายและการจัดเก็บ:
- Snappy: ความสมดุลที่ดีของความเร็วและอัตราส่วนความอัด (แนะนำสำหรับส่วนใหญ่)
- LZ4: เร็วที่สุด อัตราส่วนความอัดต่ำกว่า
- Gzip: ความอัดที่ดีที่สุด ช้ากว่า
- Zstd: สมัยใหม่ อัตราส่วนและความเร็วที่ยอดเยี่ยม (Kafka 2.1+)
Consumer Implementation
Consumers อ่านเหตุการณ์จาก topics อย่างอิสระ นี่คือ consumer ของผลิตภาพ:
interface ConsumerConfig {
brokers: string[];
groupId: string;
clientId: string;
topics: string[];
}
class KafkaConsumer {
private kafka: Kafka;
private consumer: any;
private groupId: string;
constructor(config: ConsumerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
logLevel: logLevel.INFO,
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_USER || '',
password: process.env.KAFKA_PASSWORD || '',
},
});
this.groupId = config.groupId;
this.consumer = this.kafka.consumer({
groupId: config.groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
// Offset commit strategy
allowAutoTopicCreation: false,
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({ topics, fromBeginning: false });
}
async run(
onMessage: (topic: string, partition: number, message: any) => Promise<void>
): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const value = JSON.parse(message.value?.toString() || '{}');
await onMessage(topic, partition, value);
// Offset committed automatically with autoCommit
} catch (error) {
console.error(`Error processing message from ${topic}:${partition}`, error);
// Handle poison pill messages appropriately
throw error;
}
},
// Manual offset management for critical applications
autoCommit: {
interval: 5000,
threshold: 100,
},
});
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
}
// Reset offset for replaying events
async resetOffset(topic: string, offset: string): Promise<void> {
await this.consumer.seek({ topic, partition: 0, offset });
}
}
// Usage example
const consumer = new KafkaConsumer({
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
groupId: 'order-processing-service',
clientId: 'order-processor-1',
topics: ['orders'],
});
await consumer.connect();
await consumer.subscribe(['orders']);
await consumer.run(async (topic, partition, message) => {
console.log(`Processing order from partition ${partition}:`, message);
// Process order (call external API, update database, etc.)
await processOrder(message);
// Offset is committed automatically by eachMessage
});// Spring Kafka @KafkaListener approach
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
@Service
public class KafkaConsumerService {
@KafkaListener(
topics = "orders",
groupId = "order-processing-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
var value = objectMapper.readValue(record.value(), Map.class);
System.out.printf("Processing order from partition %d: %s%n",
record.partition(), value);
processOrder(value);
ack.acknowledge(); // Manual commit after successful processing
} catch (Exception e) {
log.error("Error processing message from {}:{}", record.topic(), record.partition(), e);
throw new RuntimeException(e);
}
}
}
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-service");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "order-processor-1");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}# pip install aiokafka
from aiokafka import AIOKafkaConsumer
import asyncio
import json
import os
class KafkaConsumerService:
def __init__(self, brokers: list[str], group_id: str, client_id: str):
self.brokers = brokers
self.group_id = group_id
self.client_id = client_id
self.consumer: AIOKafkaConsumer | None = None
async def connect(self) -> None:
self.consumer = AIOKafkaConsumer(
bootstrap_servers=",".join(self.brokers),
group_id=self.group_id,
client_id=self.client_id,
session_timeout_ms=30000,
heartbeat_interval_ms=3000,
max_partition_fetch_bytes=1048576,
enable_auto_commit=True,
auto_commit_interval_ms=5000,
sasl_mechanism="PLAIN",
sasl_plain_username=os.getenv("KAFKA_USER", ""),
sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
)
await self.consumer.start()
async def subscribe(self, topics: list[str]) -> None:
self.consumer.subscribe(topics)
async def run(self, on_message) -> None:
async for msg in self.consumer:
try:
value = json.loads(msg.value.decode())
await on_message(msg.topic, msg.partition, value)
except Exception as e:
print(f"Error processing message from {msg.topic}:{msg.partition}: {e}")
raise
async def disconnect(self) -> None:
if self.consumer:
await self.consumer.stop()
# Reset offset for replaying events
async def reset_offset(self, topic: str, partition: int, offset: int) -> None:
from aiokafka import TopicPartition
tp = TopicPartition(topic, partition)
self.consumer.seek(tp, offset)
# Usage example
async def main():
consumer = KafkaConsumerService(
brokers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
group_id="order-processing-service",
client_id="order-processor-1",
)
await consumer.connect()
await consumer.subscribe(["orders"])
async def handle_message(topic, partition, message):
print(f"Processing order from partition {partition}: {message}")
await process_order(message)
await consumer.run(handle_message)// NuGet: Confluent.Kafka
using Confluent.Kafka;
using System.Text.Json;
public class KafkaConsumerService : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly ILogger<KafkaConsumerService> _logger;
public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = "kafka-1:9092,kafka-2:9092,kafka-3:9092",
GroupId = "order-processing-service",
ClientId = "order-processor-1",
SessionTimeoutMs = 30000,
HeartbeatIntervalMs = 3000,
MaxPartitionFetchBytes = 1048576,
EnableAutoCommit = true,
AutoCommitIntervalMs = 5000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
SaslPassword = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
SecurityProtocol = SecurityProtocol.SaslSsl,
AutoOffsetReset = AutoOffsetReset.Latest,
};
_consumer = new ConsumerBuilder<string, string>(config).Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("orders");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(stoppingToken);
var value = JsonSerializer.Deserialize<Dictionary<string, object>>(result.Message.Value);
_logger.LogInformation(
"Processing order from partition {Partition}: {Message}",
result.Partition.Value, value);
await ProcessOrderAsync(value!);
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
throw;
}
}
_consumer.Close();
}
// Reset offset for replaying events
public void ResetOffset(string topic, int partition, long offset)
{
_consumer.Seek(new TopicPartitionOffset(topic, partition, offset));
}
private Task ProcessOrderAsync(Dictionary<string, object> message) =>
Task.CompletedTask; // Replace with real logic
}Consumer Groups และ Rebalancing
consumer group ประสานงาน consumers หลายตัวเพื่อประมวลผล topic ในแบบขนาน Kafka กำหนด partitions ให้ consumers โดยอัตโนมัติ:
Topic: orders (6 partitions)
Consumer Group: order-processors
├── Consumer 1 → [Partition 0, 2, 4]
├── Consumer 2 → [Partition 1, 3, 5]
└── Consumer 3 → [Partition 0, 1, 2] (joins → rebalancing occurs)
// After rebalancing:
├── Consumer 1 → [Partition 0, 3]
├── Consumer 2 → [Partition 1, 4]
└── Consumer 3 → [Partition 2, 5]
Rebalancing เกิดขึ้นเมื่อ consumers เข้าร่วม/ออก ในระหว่าง rebalancing:
- ผู้บริโภคทั้งหมดในกลุ่มหยุดการประมวลผล
- Partitions ถูกกำหนดใหม่
- Consumers ทำงานต่อจาก offsets ที่ commit
ลดผลกระทบของ rebalancing:
- ตั้ง session timeouts สมควร (30 วินาทีเริ่มต้น)
- มั่นใจว่าการประมวลผล consumer สำเร็จก่อน timeout
- ใช้ static group membership ในคอนเทนเนอร์เพื่อลด rebalancing
Offset Management
Offsets ติดตามความคืบหน้า partition Commit strategies:
// 1. Auto-commit (default) — simple but risky
// Risk: lose progress if consumer crashes between commit and processing
// 2. Manual commit after processing
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message);
// Commit only after successful processing
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString(),
},
]);
} catch (error) {
// Don't commit, consumer will retry from this offset
throw error;
}
},
autoCommit: false,
});
// 3. Batch commit — balance throughput and safety
// Commit after processing N messages
let processedCount = 0;
const BATCH_SIZE = 100;
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processMessage(message);
processedCount++;
if (processedCount >= BATCH_SIZE) {
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString(),
},
]);
processedCount = 0;
}
},
autoCommit: false,
});// 1. Auto-commit — set enable.auto.commit=true in config (default)
// 2. Manual commit after processing (AckMode.MANUAL)
@KafkaListener(topics = "orders", groupId = "order-processors")
public void consumeManual(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processMessage(record.value());
ack.acknowledge(); // Commit only after successful processing
} catch (Exception e) {
// Don't ack — consumer will retry from this offset
throw new RuntimeException(e);
}
}
// 3. Batch commit — AckMode.COUNT
// In container factory: factory.getContainerProperties().setAckCount(100);
@KafkaListener(topics = "orders", groupId = "order-processors",
containerFactory = "batchListenerContainerFactory")
public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
for (var record : records) {
processMessage(record.value());
}
ack.acknowledge(); // Commit after entire batch
}from aiokafka import AIOKafkaConsumer, TopicPartition
# 2. Manual commit after processing
consumer = AIOKafkaConsumer(
"orders",
bootstrap_servers="kafka-1:9092",
group_id="order-processors",
enable_auto_commit=False,
)
await consumer.start()
async for msg in consumer:
try:
await process_message(json.loads(msg.value.decode()))
# Commit only after successful processing
await consumer.commit({
TopicPartition(msg.topic, msg.partition): msg.offset + 1
})
except Exception as e:
# Don't commit — consumer will retry from this offset
raise
# 3. Batch commit — process N messages then commit
processed_count = 0
BATCH_SIZE = 100
last_offsets = {}
async for msg in consumer:
await process_message(json.loads(msg.value.decode()))
processed_count += 1
last_offsets[TopicPartition(msg.topic, msg.partition)] = msg.offset + 1
if processed_count >= BATCH_SIZE:
await consumer.commit(last_offsets)
processed_count = 0
last_offsets = {}// 2. Manual commit after processing
var config = new ConsumerConfig
{
EnableAutoCommit = false,
// ...other config
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");
while (true)
{
var result = consumer.Consume();
try
{
await ProcessMessageAsync(result.Message.Value);
// Commit only after successful processing
consumer.Commit(result);
}
catch (Exception)
{
// Don't commit — consumer will retry from this offset
throw;
}
}
// 3. Batch commit — commit after N messages
int processedCount = 0;
const int BATCH_SIZE = 100;
ConsumeResult<string, string>? lastResult = null;
while (true)
{
var result = consumer.Consume();
await ProcessMessageAsync(result.Message.Value);
processedCount++;
lastResult = result;
if (processedCount >= BATCH_SIZE && lastResult != null)
{
consumer.Commit(lastResult);
processedCount = 0;
lastResult = null;
}
}Delivery Guarantees
Kafka รองรับ delivery semantics สามตัว โดยแต่ละตัวมี tradeoffs:
At-Least-Once (บ่อยที่สุด)
Producer ลองใหม่ failed sends Consumer commits offset หลังจากการประมวลผล
Risk: Duplicates ถ้า consumer ล้มเหลวหลังจากการประมวลผลแต่ก่อน committing offset ข้อความเดียวกันประมวลผลสองครั้ง
// Configuration for at-least-once
const producer = kafka.producer({
acks: 1, // Wait for leader acknowledgment
retries: 5,
timeout: 30000,
});
// Consumer: commit offset after processing
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processMessage(message); // Must be idempotent!
await consumer.commitOffsets([...]);
},
});// Configuration for at-least-once
@Bean
public ProducerFactory<String, String> atLeastOnceProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader acknowledgment
config.put(ProducerConfig.RETRIES_CONFIG, 5);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return new DefaultKafkaProducerFactory<>(config);
}
// Consumer: commit offset after processing (AckMode.MANUAL)
@KafkaListener(topics = "orders", groupId = "processors")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
processMessage(record.value()); // Must be idempotent!
ack.acknowledge();
}# Configuration for at-least-once
producer = AIOKafkaProducer(
bootstrap_servers="kafka-1:9092",
acks=1, # Wait for leader acknowledgment
request_timeout_ms=30000,
)
# Consumer: commit offset after processing
async for msg in consumer:
await process_message(json.loads(msg.value.decode())) # Must be idempotent!
await consumer.commit({TopicPartition(msg.topic, msg.partition): msg.offset + 1})// Configuration for at-least-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
Acks = Acks.Leader, // Wait for leader acknowledgment
MessageTimeoutMs = 30000,
// Retries handled by EnableIdempotence or manual retry logic
};
// Consumer: commit offset after processing
while (true)
{
var result = consumer.Consume();
await ProcessMessageAsync(result.Message.Value); // Must be idempotent!
consumer.Commit(result);
}Mitigation: ทำให้การประมวลผลข้อความ idempotent โดยใช้ deduplication:
// Idempotent processing example
async function processOrderIdempotent(order: any) {
// Check if we've already processed this order
const exists = await database.query(
'SELECT id FROM processed_orders WHERE order_id = ?',
[order.orderId]
);
if (exists) {
console.log('Order already processed, skipping');
return;
}
// Process order
await database.execute('INSERT INTO orders VALUES (?)', [order]);
await database.execute('INSERT INTO processed_orders VALUES (?, ?)', [
order.orderId,
Date.now(),
]);
}@Service
public class IdempotentOrderService {
private final JdbcTemplate jdbcTemplate;
public void processOrderIdempotent(Map<String, Object> order) {
String orderId = (String) order.get("orderId");
// Check if we've already processed this order
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM processed_orders WHERE order_id = ?",
Integer.class, orderId);
if (count != null && count > 0) {
log.info("Order already processed, skipping: {}", orderId);
return;
}
// Process order
jdbcTemplate.update("INSERT INTO orders VALUES (?)", orderId);
jdbcTemplate.update(
"INSERT INTO processed_orders VALUES (?, ?)",
orderId, System.currentTimeMillis());
}
}async def process_order_idempotent(order: dict, db) -> None:
order_id = order["orderId"]
# Check if we've already processed this order
result = await db.fetch_one(
"SELECT id FROM processed_orders WHERE order_id = :order_id",
{"order_id": order_id},
)
if result:
print(f"Order already processed, skipping: {order_id}")
return
# Process order
await db.execute("INSERT INTO orders VALUES (:order)", {"order": order_id})
await db.execute(
"INSERT INTO processed_orders VALUES (:order_id, :ts)",
{"order_id": order_id, "ts": int(asyncio.get_event_loop().time() * 1000)},
)public class IdempotentOrderService
{
private readonly IDbConnection _db;
public async Task ProcessOrderIdempotentAsync(Dictionary<string, object> order)
{
var orderId = order["orderId"].ToString()!;
// Check if we've already processed this order
var exists = await _db.QueryFirstOrDefaultAsync<int>(
"SELECT COUNT(1) FROM processed_orders WHERE order_id = @OrderId",
new { OrderId = orderId });
if (exists > 0)
{
Console.WriteLine($"Order already processed, skipping: {orderId}");
return;
}
// Process order
await _db.ExecuteAsync("INSERT INTO orders VALUES (@OrderId)", new { OrderId = orderId });
await _db.ExecuteAsync(
"INSERT INTO processed_orders VALUES (@OrderId, @Ts)",
new { OrderId = orderId, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() });
}
}At-Most-Once
Producer ส่งครั้งเดียว Consumer commits offset ก่อนการประมวลผล
Risk: การสูญเสียข้อความถ้าการประมวลผลล้มเหลวก่อนสมบูรณ์
// Configuration for at-most-once
const producer = kafka.producer({
acks: 0, // No acknowledgment, fire and forget
retries: 0,
});
// Consumer: commit offset before processing
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await consumer.commitOffsets([...]); // Commit first
await processMessage(message); // Process after
},
});// Configuration for at-most-once
@Bean
public ProducerFactory<String, String> atMostOnceProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.ACKS_CONFIG, "0"); // No acknowledgment
config.put(ProducerConfig.RETRIES_CONFIG, 0);
return new DefaultKafkaProducerFactory<>(config);
}
// Consumer: commit offset before processing
@KafkaListener(topics = "orders", groupId = "processors")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
ack.acknowledge(); // Commit first
processMessage(record.value()); // Process after
}# Configuration for at-most-once
producer = AIOKafkaProducer(
bootstrap_servers="kafka-1:9092",
acks=0, # No acknowledgment, fire and forget
)
# Consumer: commit offset before processing
async for msg in consumer:
await consumer.commit({ # Commit first
TopicPartition(msg.topic, msg.partition): msg.offset + 1
})
await process_message(json.loads(msg.value.decode())) # Process after// Configuration for at-most-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
Acks = Acks.None, // No acknowledgment, fire and forget
};
// Consumer: commit offset before processing
while (true)
{
var result = consumer.Consume();
consumer.Commit(result); // Commit first
await ProcessMessageAsync(result.Message.Value); // Process after
}Exactly-Once (Idempotent Producer + Transactions)
Idempotent producer ด้วย deduplication Transactional writes มั่นใจ atomicity
// Producer configuration for exactly-once
const producer = kafka.producer({
idempotent: true, // Enable idempotent producer
maxInFlightRequests: 5, // Maintain ordering
transactionTimeout: 60000,
});
// Transactional write
const transaction = await producer.transaction();
try {
await transaction.send({
topic: 'orders',
messages: [
{ key: orderId, value: JSON.stringify(orderData) },
],
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
// Consumer: enable isolation level
const consumer = kafka.consumer({
groupId: 'order-processors',
isolationLevel: 1, // Read committed — only committed messages
});// Producer configuration for exactly-once
@Bean
public ProducerFactory<String, String> exactlyOnceProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-1");
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
return new DefaultKafkaProducerFactory<>(config);
}
// Transactional write
kafkaTemplate.executeInTransaction(kt -> {
kt.send("orders", orderId, objectMapper.writeValueAsString(orderData));
return true;
});
// Consumer: read committed isolation level
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");# Producer configuration for exactly-once
producer = AIOKafkaProducer(
bootstrap_servers="kafka-1:9092",
enable_idempotence=True,
max_in_flight_requests_per_connection=5,
transactional_id="order-tx-1",
transaction_timeout_ms=60000,
)
await producer.start()
# Transactional write
async with producer.transaction():
await producer.send("orders", key=order_id.encode(),
value=json.dumps(order_data).encode())
# Consumer: read committed isolation level
consumer = AIOKafkaConsumer(
"orders",
bootstrap_servers="kafka-1:9092",
group_id="order-processors",
isolation_level="read_committed",
)// Producer configuration for exactly-once
var producerConfig = new Confluent.Kafka.ProducerConfig
{
EnableIdempotence = true,
MaxInFlight = 5,
TransactionalId = "order-tx-1",
TransactionTimeoutMs = 60000,
};
var producer = new ProducerBuilder<string, string>(producerConfig).Build();
// Transactional write
producer.InitTransactions(TimeSpan.FromSeconds(30));
producer.BeginTransaction();
try
{
producer.Produce("orders", new Message<string, string>
{
Key = orderId,
Value = JsonSerializer.Serialize(orderData),
});
producer.CommitTransaction();
}
catch
{
producer.AbortTransaction();
throw;
}
// Consumer: read committed isolation level
var consumerConfig = new ConsumerConfig
{
IsolationLevel = IsolationLevel.ReadCommitted,
GroupId = "order-processors",
};Guarantees:
- Idempotent producer: Broker deduplicates ตามเสิร์จ ID และหมายเลขลำดับ
- Transactional writes: Multiple topic writes atomically สำเร็จหรือล้มเหลว
- Read committed: Consumer เห็นเฉพาะข้อความที่ commit ไม่ใช่การเขียนที่ไม่ commit จากธุรกรรมที่ล้มเหลว
Schema Registry และ Schema Evolution
Schemas กำหนดโครงสร้างของเหตุการณ์ Schema Registry เปิดใจให้ทีมพัฒนา schemas อย่างปลอดภัย:
import { SchemaRegistry, SchemaType } from '@confluent/kafka-javascript';
const schemaRegistry = new SchemaRegistry({
host: 'http://schema-registry:8081',
auth: {
username: process.env.SR_USER,
password: process.env.SR_PASSWORD,
},
});
// Define an order event schema
const orderSchema = {
type: 'record',
name: 'Order',
namespace: 'com.example.orders',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'customerId', type: 'string' },
{ name: 'amount', type: 'double' },
{ name: 'timestamp', type: 'long' },
// New field with default (backward compatible)
{ name: 'currency', type: 'string', default: 'USD' },
],
};
// Register schema
const schemaId = await schemaRegistry.register(
SchemaType.AVRO,
{
type: 'AVRO',
schema: JSON.stringify(orderSchema),
},
'orders-value'
);
// Serialize with schema
async function serializeOrder(order: any): Promise<Buffer> {
const schemaId = await schemaRegistry.getSchemaId(
'orders-value',
SchemaType.AVRO
);
const serializer = schemaRegistry.getSerializer({ schemaId });
return serializer.serialize(order);
}
// Deserialize with schema
async function deserializeOrder(buffer: Buffer): Promise<any> {
const deserializer = schemaRegistry.getDeserializer();
return deserializer.deserialize(buffer);
}
// Send Avro-serialized event
const serialized = await serializeOrder({
orderId: 'order-123',
customerId: 'cust-456',
amount: 99.99,
timestamp: Date.now(),
currency: 'USD',
});
await producer.send({
topic: 'orders',
messages: [{ key: 'order-123', value: serialized }],
});// Maven: io.confluent:kafka-avro-serializer
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@Service
public class SchemaRegistryProducerService {
private final KafkaTemplate<String, GenericRecord> avroTemplate;
private static final String ORDER_SCHEMA_JSON = """
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"},
{"name": "currency", "type": "string", "default": "USD"}
]
}""";
private final Schema orderSchema = new Schema.Parser().parse(ORDER_SCHEMA_JSON);
public void sendOrderEvent(String orderId, String customerId, double amount) {
GenericRecord record = new GenericData.Record(orderSchema);
record.put("orderId", orderId);
record.put("customerId", customerId);
record.put("amount", amount);
record.put("timestamp", System.currentTimeMillis());
record.put("currency", "USD");
avroTemplate.send("orders", orderId, record);
}
}
// Spring Boot config for Avro serializer
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
config.put("schema.registry.url", "http://schema-registry:8081");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}# pip install confluent-kafka[avro] fastavro
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka import SerializingProducer, DeserializingConsumer
import os
schema_registry_client = SchemaRegistryClient({
"url": "http://schema-registry:8081",
"basic.auth.user.info": f"{os.getenv('SR_USER')}:{os.getenv('SR_PASSWORD')}",
})
order_schema_str = """
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"},
{"name": "currency", "type": "string", "default": "USD"}
]
}"""
avro_serializer = AvroSerializer(schema_registry_client, order_schema_str)
avro_deserializer = AvroDeserializer(schema_registry_client)
producer = SerializingProducer({
"bootstrap.servers": "kafka-1:9092",
"key.serializer": lambda key, ctx: key.encode(),
"value.serializer": avro_serializer,
})
# Send Avro-serialized event
producer.produce(
topic="orders",
key="order-123",
value={
"orderId": "order-123",
"customerId": "cust-456",
"amount": 99.99,
"timestamp": int(time.time() * 1000),
"currency": "USD",
},
)
producer.flush()// NuGet: Confluent.SchemaRegistry.Serdes.Avro
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Confluent.Kafka;
using Avro;
using Avro.Generic;
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://schema-registry:8081",
BasicAuthUserInfo = $"{Environment.GetEnvironmentVariable("SR_USER")}:{Environment.GetEnvironmentVariable("SR_PASSWORD")}",
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
const string orderSchemaJson = """
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"},
{"name": "currency", "type": "string", "default": "USD"}
]
}""";
var schema = (RecordSchema)Schema.Parse(orderSchemaJson);
var producerConfig = new ProducerConfig { BootstrapServers = "kafka-1:9092" };
using var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
.Build();
// Send Avro-serialized event
var record = new GenericRecord(schema);
record.Add("orderId", "order-123");
record.Add("customerId", "cust-456");
record.Add("amount", 99.99);
record.Add("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
record.Add("currency", "USD");
await producer.ProduceAsync("orders", new Message<string, GenericRecord>
{
Key = "order-123",
Value = record,
});Schema Evolution Rules
รักษาความเข้ากันได้โดยใช้กลยุทธ์การสร้างแบบเวอร์ชั่น:
BACKWARD compatibility: New schema can read data written with old schema
├── Add fields with defaults ✓
├── Remove fields ✗
└── Rename fields ✗
FORWARD compatibility: Old schema can read data written with new schema
├── Add fields ✓
├── Remove fields with defaults ✓
└── Rename fields ✗
FULL compatibility: Both backward and forward
├── Only add/remove fields with defaults ✓
Transitive compatibility: All versions can interoperate
Event Sourcing และ CQRS Patterns
Event sourcing ทำให้เหตุการณ์เป็นแหล่งความจริง แทนที่จะจัดเก็บสถานะสุดท้าย ให้จัดเก็บเหตุการณ์ที่เปลี่ยนสถานะทั้งหมด:
// Event Sourcing: Store all events, derive state
interface OrderEvent {
eventType: 'OrderCreated' | 'OrderPaid' | 'OrderShipped' | 'OrderCancelled';
aggregateId: string;
timestamp: number;
data: any;
}
class OrderAggregate {
private events: OrderEvent[] = [];
private orderId: string;
async applyEvent(event: OrderEvent): Promise<void> {
this.events.push(event);
// Send to event store (Kafka)
await producer.sendEvent('order-events', event.aggregateId, event);
// Update read models for CQRS
await this.updateReadModel(event);
}
private async updateReadModel(event: OrderEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await database.execute(
'INSERT INTO order_summary (order_id, status) VALUES (?, ?)',
[event.aggregateId, 'pending']
);
break;
case 'OrderPaid':
await database.execute(
'UPDATE order_summary SET status = ? WHERE order_id = ?',
['paid', event.aggregateId]
);
break;
case 'OrderShipped':
await database.execute(
'UPDATE order_summary SET status = ? WHERE order_id = ?',
['shipped', event.aggregateId]
);
break;
}
}
// Rebuild state from event stream
static async loadFromEvents(
orderId: string,
events: OrderEvent[]
): Promise<OrderAggregate> {
const aggregate = new OrderAggregate();
aggregate.orderId = orderId;
aggregate.events = events;
return aggregate;
}
}
// Usage
const order = new OrderAggregate();
await order.applyEvent({
eventType: 'OrderCreated',
aggregateId: 'order-123',
timestamp: Date.now(),
data: {
customerId: 'cust-456',
items: [{ productId: 'prod-1', quantity: 2 }],
},
});// Event Sourcing: Store all events, derive state
public sealed interface OrderEvent permits OrderCreated, OrderPaid, OrderShipped, OrderCancelled {
String aggregateId();
long timestamp();
}
public record OrderCreated(String aggregateId, long timestamp, String customerId, List<OrderItem> items) implements OrderEvent {}
public record OrderPaid(String aggregateId, long timestamp, double amount) implements OrderEvent {}
public record OrderShipped(String aggregateId, long timestamp, String trackingNumber) implements OrderEvent {}
@Service
public class OrderAggregate {
private final List<OrderEvent> events = new ArrayList<>();
private final KafkaProducerService producerService;
private final JdbcTemplate jdbcTemplate;
public void applyEvent(OrderEvent event) throws Exception {
events.add(event);
// Send to event store (Kafka)
producerService.sendEvent("order-events", event.aggregateId(),
objectMapper.writeValueAsString(event)).get();
// Update read models for CQRS
updateReadModel(event);
}
private void updateReadModel(OrderEvent event) {
switch (event) {
case OrderCreated e -> jdbcTemplate.update(
"INSERT INTO order_summary (order_id, status) VALUES (?, ?)",
e.aggregateId(), "pending");
case OrderPaid e -> jdbcTemplate.update(
"UPDATE order_summary SET status = ? WHERE order_id = ?",
"paid", e.aggregateId());
case OrderShipped e -> jdbcTemplate.update(
"UPDATE order_summary SET status = ? WHERE order_id = ?",
"shipped", e.aggregateId());
default -> {}
}
}
public static OrderAggregate loadFromEvents(String orderId, List<OrderEvent> events) {
var aggregate = new OrderAggregate(null, null);
aggregate.events.addAll(events);
return aggregate;
}
}from dataclasses import dataclass, field
from typing import Literal
import asyncio
@dataclass
class OrderEvent:
event_type: Literal["OrderCreated", "OrderPaid", "OrderShipped", "OrderCancelled"]
aggregate_id: str
timestamp: int
data: dict
class OrderAggregate:
def __init__(self, producer, db):
self.events: list[OrderEvent] = []
self.order_id: str = ""
self._producer = producer
self._db = db
async def apply_event(self, event: OrderEvent) -> None:
self.events.append(event)
# Send to event store (Kafka)
await self._producer.send_event("order-events", event.aggregate_id, {
"eventType": event.event_type,
"aggregateId": event.aggregate_id,
"timestamp": event.timestamp,
"data": event.data,
})
# Update read models for CQRS
await self._update_read_model(event)
async def _update_read_model(self, event: OrderEvent) -> None:
match event.event_type:
case "OrderCreated":
await self._db.execute(
"INSERT INTO order_summary (order_id, status) VALUES (:id, :status)",
{"id": event.aggregate_id, "status": "pending"},
)
case "OrderPaid":
await self._db.execute(
"UPDATE order_summary SET status = :status WHERE order_id = :id",
{"status": "paid", "id": event.aggregate_id},
)
case "OrderShipped":
await self._db.execute(
"UPDATE order_summary SET status = :status WHERE order_id = :id",
{"status": "shipped", "id": event.aggregate_id},
)
@classmethod
def load_from_events(cls, order_id: str, events: list[OrderEvent]) -> "OrderAggregate":
aggregate = cls(None, None)
aggregate.order_id = order_id
aggregate.events = events
return aggregate
# Usage
order = OrderAggregate(producer, db)
await order.apply_event(OrderEvent(
event_type="OrderCreated",
aggregate_id="order-123",
timestamp=int(asyncio.get_event_loop().time() * 1000),
data={"customerId": "cust-456", "items": [{"productId": "prod-1", "quantity": 2}]},
))// Event Sourcing: Store all events, derive state
public abstract record OrderEvent(string AggregateId, long Timestamp);
public record OrderCreated(string AggregateId, long Timestamp, string CustomerId, List<OrderItem> Items) : OrderEvent(AggregateId, Timestamp);
public record OrderPaid(string AggregateId, long Timestamp, double Amount) : OrderEvent(AggregateId, Timestamp);
public record OrderShipped(string AggregateId, long Timestamp, string TrackingNumber) : OrderEvent(AggregateId, Timestamp);
public class OrderAggregate
{
private readonly List<OrderEvent> _events = [];
private readonly KafkaProducerService _producer;
private readonly IDbConnection _db;
public async Task ApplyEventAsync(OrderEvent evt)
{
_events.Add(evt);
// Send to event store (Kafka)
await _producer.SendEventAsync("order-events", evt.AggregateId, evt);
// Update read models for CQRS
await UpdateReadModelAsync(evt);
}
private async Task UpdateReadModelAsync(OrderEvent evt)
{
var sql = evt switch
{
OrderCreated e => ("INSERT INTO order_summary (order_id, status) VALUES (@Id, @Status)",
new { Id = e.AggregateId, Status = "pending" }),
OrderPaid e => ("UPDATE order_summary SET status = @Status WHERE order_id = @Id",
new { Id = e.AggregateId, Status = "paid" }),
OrderShipped e => ("UPDATE order_summary SET status = @Status WHERE order_id = @Id",
new { Id = e.AggregateId, Status = "shipped" }),
_ => (null, null),
};
if (sql.Item1 != null)
await _db.ExecuteAsync(sql.Item1, sql.Item2);
}
public static OrderAggregate LoadFromEvents(string orderId, IEnumerable<OrderEvent> events)
{
var aggregate = new OrderAggregate(null!, null!);
aggregate._events.AddRange(events);
return aggregate;
}
}Benefits:
- บันทึกตรวจสอบที่สมบูรณ์ของการเปลี่ยนแปลงสถานะทั้งหมด
- Time travel: สร้างสถานะใหม่ในจุดใดๆ
- เล่นซ้ำเหตุการณ์สำหรับการแก้จุดบกพร่องหรือการวิเคราะห์
- อ่านและเขียนแยกรุ่น (CQRS)
Challenges:
- Eventual consistency ต้องการการจัดการ
- การสร้างสถานะใหม่สามารถช้า
- การลบข้อมูล (GDPR) ต้องการการจัดการอย่างระมัดระวัง
Kafka Streams: Stateful Stream Processing
Kafka Streams ประมวลผลเหตุการณ์ด้วยการดำเนินการ stateful เช่น aggregations และ joins:
// Kafka Streams topology (pseudo-code with KafkaJS equivalent)
// 1. Stream-Stream Join: Correlate events from different topics
async function streamStreamJoin() {
// Consume from orders and payments topics
// Join on orderId within time window
// Emit joined events to order-fulfillment topic
const orderStream = new Map();
const paymentStream = new Map();
const joinWindow = 30000; // 30 seconds
await consumer.run({
eachMessage: async ({ topic, message }) => {
const { orderId, timestamp, ...data } = JSON.parse(message.value);
if (topic === 'orders') {
orderStream.set(orderId, { ...data, timestamp });
} else if (topic === 'payments') {
paymentStream.set(orderId, { ...data, timestamp });
}
// Check for join opportunity
if (orderStream.has(orderId) && paymentStream.has(orderId)) {
const order = orderStream.get(orderId);
const payment = paymentStream.get(orderId);
if (Math.abs(order.timestamp - payment.timestamp) <= joinWindow) {
// Emit joined event
await producer.sendEvent('order-fulfillment', orderId, {
orderId,
order,
payment,
joinedAt: Date.now(),
});
orderStream.delete(orderId);
paymentStream.delete(orderId);
}
}
},
});
}
// 2. Windowed Aggregation: Compute metrics over time windows
async function windowedAggregation() {
const windows = new Map();
const WINDOW_SIZE = 60000; // 1 minute
const WINDOW_ADVANCE = 10000; // 10 seconds
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value);
const windowKey = Math.floor(event.timestamp / WINDOW_ADVANCE);
if (!windows.has(windowKey)) {
windows.set(windowKey, { count: 0, sum: 0 });
}
const window = windows.get(windowKey);
window.count++;
window.sum += event.amount;
// Emit windowed metric
await producer.sendEvent('revenue-metrics', `window-${windowKey}`, {
window: windowKey,
count: window.count,
sum: window.sum,
avg: window.sum / window.count,
});
},
});
}
// 3. Stateful Map: Maintain per-key state
async function statefulProcessing() {
const userState = new Map();
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value);
const userId = message.key.toString();
if (!userState.has(userId)) {
userState.set(userId, { events: [], lastUpdate: 0 });
}
const state = userState.get(userId);
state.events.push(event);
state.lastUpdate = Date.now();
// Emit enriched event
await producer.sendEvent('enriched-events', userId, {
event,
userContext: {
totalEvents: state.events.length,
lastUpdate: state.lastUpdate,
},
});
},
});
}// Kafka Streams DSL — native Java API
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
@Configuration
public class KafkaStreamsTopology {
// 1. Stream-Stream Join
@Bean
public KStream<String, String> streamStreamJoin(StreamsBuilder builder) {
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");
JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30));
orders.join(
payments,
(order, payment) -> {
var joined = Map.of("order", order, "payment", payment, "joinedAt", System.currentTimeMillis());
return objectMapper.writeValueAsString(joined);
},
joinWindow,
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
).to("order-fulfillment");
return orders;
}
// 2. Windowed Aggregation
@Bean
public KStream<String, String> windowedAggregation(StreamsBuilder builder) {
builder.<String, String>stream("orders")
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
.advanceBy(Duration.ofSeconds(10)))
.aggregate(
() -> "{\"count\":0,\"sum\":0.0}",
(key, value, aggregate) -> {
// Parse and update aggregate
var agg = parseAggregate(aggregate);
var event = parseEvent(value);
agg.put("count", (int) agg.get("count") + 1);
agg.put("sum", (double) agg.get("sum") + (double) event.get("amount"));
agg.put("avg", (double) agg.get("sum") / (int) agg.get("count"));
return objectMapper.writeValueAsString(agg);
},
Materialized.with(Serdes.String(), Serdes.String())
)
.toStream()
.map((windowedKey, value) -> KeyValue.pair(
"window-" + windowedKey.window().start(), value))
.to("revenue-metrics");
return builder.stream("orders");
}
// 3. Stateful Map with state store
@Bean
public KStream<String, String> statefulProcessing(StreamsBuilder builder) {
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-state"),
Serdes.String(), Serdes.String());
builder.addStateStore(storeBuilder);
builder.<String, String>stream("events")
.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
private KeyValueStore<String, String> store;
@Override
public void init(ProcessorContext ctx) {
store = ctx.getStateStore("user-state");
}
@Override
public KeyValue<String, String> transform(String userId, String value) {
var state = Optional.ofNullable(store.get(userId))
.map(s -> parseState(s))
.orElse(new HashMap<>());
state.put("lastUpdate", System.currentTimeMillis());
state.merge("totalEvents", 1, (a, b) -> (int)a + (int)b);
store.put(userId, objectMapper.writeValueAsString(state));
return KeyValue.pair(userId, buildEnrichedEvent(value, state));
}
}, "user-state")
.to("enriched-events");
return builder.stream("events");
}
}# pip install faust-streaming (Faust — Python Kafka Streams)
import faust
from datetime import timedelta
app = faust.App("stream-processor", broker="kafka://kafka-1:9092")
orders_topic = app.topic("orders", value_type=bytes)
payments_topic = app.topic("payments", value_type=bytes)
fulfillment_topic = app.topic("order-fulfillment", value_type=bytes)
metrics_topic = app.topic("revenue-metrics", value_type=bytes)
enriched_topic = app.topic("enriched-events", value_type=bytes)
# 1. Stream-Stream Join (in-memory within window)
order_table = app.Table("order-stream", default=dict)
payment_table = app.Table("payment-stream", default=dict)
JOIN_WINDOW = 30 # seconds
@app.agent(orders_topic)
async def process_orders(orders):
async for order_bytes in orders:
order = json.loads(order_bytes)
order_id = order["orderId"]
order_table[order_id] = {**order, "ts": time.time()}
await _try_join(order_id)
@app.agent(payments_topic)
async def process_payments(payments):
async for payment_bytes in payments:
payment = json.loads(payment_bytes)
order_id = payment["orderId"]
payment_table[order_id] = {**payment, "ts": time.time()}
await _try_join(order_id)
async def _try_join(order_id: str) -> None:
order = order_table.get(order_id)
payment = payment_table.get(order_id)
if order and payment and abs(order["ts"] - payment["ts"]) <= JOIN_WINDOW:
await fulfillment_topic.send(key=order_id, value=json.dumps({
"orderId": order_id, "order": order, "payment": payment,
"joinedAt": int(time.time() * 1000),
}).encode())
del order_table[order_id]
del payment_table[order_id]
# 2. Windowed Aggregation
window_state = app.Table("revenue-windows", default=lambda: {"count": 0, "sum": 0.0})
WINDOW_ADVANCE = 10 # seconds
@app.agent(orders_topic)
async def windowed_aggregation(events):
async for event_bytes in events:
event = json.loads(event_bytes)
window_key = int(event["timestamp"] / 1000 / WINDOW_ADVANCE)
state = window_state[window_key]
state["count"] += 1
state["sum"] += event.get("amount", 0)
state["avg"] = state["sum"] / state["count"]
window_state[window_key] = state
await metrics_topic.send(key=f"window-{window_key}", value=json.dumps(state).encode())
# 3. Stateful Map
user_state_table = app.Table("user-state", default=lambda: {"events": [], "lastUpdate": 0})
@app.agent(app.topic("events", value_type=bytes))
async def stateful_processing(events):
async for event_bytes in events:
event = json.loads(event_bytes)
user_id = event.get("userId", "unknown")
state = user_state_table[user_id]
state["events"].append(event)
state["lastUpdate"] = int(time.time() * 1000)
user_state_table[user_id] = state
await enriched_topic.send(key=user_id, value=json.dumps({
"event": event,
"userContext": {"totalEvents": len(state["events"]), "lastUpdate": state["lastUpdate"]},
}).encode())// StreamProcessing with Confluent.Kafka — manual stateful approach
// (For full Kafka Streams API use Java; .NET uses manual consumer patterns)
using Confluent.Kafka;
using System.Collections.Concurrent;
public class KafkaStreamProcessor : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IProducer<string, string> _producer;
// 1. Stream-Stream Join state
private readonly ConcurrentDictionary<string, (string Data, long Ts)> _orderStream = new();
private readonly ConcurrentDictionary<string, (string Data, long Ts)> _paymentStream = new();
private const long JoinWindowMs = 30_000;
// 2. Windowed Aggregation state
private readonly ConcurrentDictionary<long, (int Count, double Sum)> _windows = new();
private const long WindowAdvanceMs = 10_000;
// 3. Stateful Map
private readonly ConcurrentDictionary<string, (List<string> Events, long LastUpdate)> _userState = new();
protected override async Task ExecuteAsync(CancellationToken ct)
{
_consumer.Subscribe(new[] { "orders", "payments", "events" });
while (!ct.IsCancellationRequested)
{
var result = _consumer.Consume(ct);
var value = result.Message.Value;
var key = result.Message.Key;
switch (result.Topic)
{
case "orders":
await HandleOrderAsync(key, value);
break;
case "payments":
await HandlePaymentAsync(key, value);
break;
case "events":
await HandleStatefulEventAsync(key, value);
break;
}
}
}
private async Task HandleOrderAsync(string orderId, string data)
{
_orderStream[orderId] = (data, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
await TryJoinAsync(orderId);
}
private async Task HandlePaymentAsync(string orderId, string data)
{
_paymentStream[orderId] = (data, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
await TryJoinAsync(orderId);
}
private async Task TryJoinAsync(string orderId)
{
if (_orderStream.TryGetValue(orderId, out var order) &&
_paymentStream.TryGetValue(orderId, out var payment) &&
Math.Abs(order.Ts - payment.Ts) <= JoinWindowMs)
{
var joined = JsonSerializer.Serialize(new { orderId, order = order.Data, payment = payment.Data, joinedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() });
await _producer.ProduceAsync("order-fulfillment", new Message<string, string> { Key = orderId, Value = joined });
_orderStream.TryRemove(orderId, out _);
_paymentStream.TryRemove(orderId, out _);
}
}
private async Task HandleStatefulEventAsync(string userId, string eventJson)
{
var (events, _) = _userState.GetOrAdd(userId, _ => ([], 0));
events.Add(eventJson);
_userState[userId] = (events, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
var enriched = JsonSerializer.Serialize(new { Event = eventJson, UserContext = new { TotalEvents = events.Count, LastUpdate = _userState[userId].LastUpdate } });
await _producer.ProduceAsync("enriched-events", new Message<string, string> { Key = userId, Value = enriched });
}
}Common patterns:
- Stream-Stream Join: ประสานเหตุการณ์จากสตรีมหลายตัว
- Stream-Table Join: เสริมเหตุการณ์สตรีมด้วยตาราการค้นหา
- Windowed Aggregation: คำนวณเมตริกซ์ในช่วงเวลา
- Stateful Map: รักษาสถานะต่อ-key ไปยังเหตุการณ์
Change Data Capture (CDC) ด้วย Debezium
CDC streams database เปลี่ยนเป็นเหตุการณ์ ซิงค์โครไนซ์ข้อมูลข้ามระบบ:
// Debezium MySQL connector configuration
// Deployed via Kafka Connect
const debeziumConfig = {
name: 'mysql-orders-cdc',
config: {
connector: 'io.debezium.connector.mysql.MySqlConnector',
database: {
hostname: 'mysql.example.com',
port: 3306,
user: 'debezium',
password: process.env.MYSQL_PASSWORD,
dbname: 'orders_db',
},
plugin: 'pgoutput',
publication: 'dbz_publication',
slot: 'dbz_slot',
table: 'orders',
transforms: 'route',
'transforms.route.type': 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.route.regex': '([^.]+)\\.([^.]+)\\.([^.]+)',
'transforms.route.replacement': '${1}-${3}',
'key.converter': 'org.apache.kafka.connect.json.JsonConverter',
'value.converter': 'org.apache.kafka.connect.json.JsonConverter',
},
};
// CDC messages have structure:
// {
// "before": { old values },
// "after": { new values },
// "source": { database metadata },
// "op": "c|u|d" (create, update, delete),
// "ts_ms": timestamp
// }
// Consume CDC events and synchronize read model
await consumer.run({
eachMessage: async ({ message }) => {
const change = JSON.parse(message.value);
switch (change.op) {
case 'c': // Create
case 'u': // Update
await syncReadModel('orders', change.after);
break;
case 'd': // Delete
await deleteFromReadModel('orders', change.before.id);
break;
}
// Emit domain events from CDC
await producer.sendEvent('order-events', change.after.id, {
eventType: change.op === 'd' ? 'OrderDeleted' : 'OrderUpdated',
order: change.after,
timestamp: change.ts_ms,
});
},
});// Consume CDC events from Debezium and synchronize read model
@Service
public class CdcConsumerService {
@KafkaListener(topics = "mysql-orders", groupId = "cdc-sync")
public void consumeCdcEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
var change = objectMapper.readValue(record.value(), Map.class);
String op = (String) change.get("op");
switch (op) {
case "c": // Create
case "u": // Update
syncReadModel("orders", (Map<String, Object>) change.get("after"));
break;
case "d": // Delete
var before = (Map<String, Object>) change.get("before");
deleteFromReadModel("orders", (String) before.get("id"));
break;
}
// Emit domain events from CDC
var after = (Map<String, Object>) change.get("after");
String eventType = "d".equals(op) ? "OrderDeleted" : "OrderUpdated";
producerService.sendEvent("order-events", (String) after.get("id"), Map.of(
"eventType", eventType,
"order", after,
"timestamp", change.get("ts_ms")
));
ack.acknowledge();
} catch (Exception e) {
log.error("Error processing CDC event", e);
throw new RuntimeException(e);
}
}
private void syncReadModel(String table, Map<String, Object> record) {
jdbcTemplate.update(
"INSERT INTO order_read_model (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?",
record.get("id"), objectMapper.writeValueAsString(record),
objectMapper.writeValueAsString(record));
}
private void deleteFromReadModel(String table, String id) {
jdbcTemplate.update("DELETE FROM order_read_model WHERE id = ?", id);
}
}# Consume CDC events from Debezium and synchronize read model
from aiokafka import AIOKafkaConsumer
import json
class CdcConsumerService:
def __init__(self, producer, db):
self._producer = producer
self._db = db
async def run(self) -> None:
consumer = AIOKafkaConsumer(
"mysql-orders",
bootstrap_servers="kafka-1:9092",
group_id="cdc-sync",
)
await consumer.start()
try:
async for msg in consumer:
await self._handle_cdc_event(json.loads(msg.value.decode()))
finally:
await consumer.stop()
async def _handle_cdc_event(self, change: dict) -> None:
op = change.get("op")
match op:
case "c" | "u": # Create or Update
await self._sync_read_model("orders", change["after"])
case "d": # Delete
await self._delete_from_read_model("orders", change["before"]["id"])
# Emit domain events from CDC
after = change.get("after") or change.get("before", {})
event_type = "OrderDeleted" if op == "d" else "OrderUpdated"
await self._producer.send_event("order-events", after.get("id", ""), {
"eventType": event_type,
"order": after,
"timestamp": change.get("ts_ms"),
})
async def _sync_read_model(self, table: str, record: dict) -> None:
await self._db.execute(
"INSERT INTO order_read_model (id, data) VALUES (:id, :data) "
"ON CONFLICT (id) DO UPDATE SET data = :data",
{"id": record["id"], "data": json.dumps(record)},
)
async def _delete_from_read_model(self, table: str, record_id: str) -> None:
await self._db.execute(
"DELETE FROM order_read_model WHERE id = :id", {"id": record_id}
)// Consume CDC events from Debezium and synchronize read model
public class CdcConsumerService : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly KafkaProducerService _producer;
private readonly IDbConnection _db;
protected override async Task ExecuteAsync(CancellationToken ct)
{
_consumer.Subscribe("mysql-orders");
while (!ct.IsCancellationRequested)
{
var result = _consumer.Consume(ct);
var change = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(result.Message.Value)!;
await HandleCdcEventAsync(change);
}
}
private async Task HandleCdcEventAsync(Dictionary<string, JsonElement> change)
{
var op = change["op"].GetString();
switch (op)
{
case "c": // Create
case "u": // Update
var after = change["after"].Deserialize<Dictionary<string, object>>()!;
await SyncReadModelAsync("orders", after);
break;
case "d": // Delete
var before = change["before"].Deserialize<Dictionary<string, object>>()!;
await DeleteFromReadModelAsync("orders", before["id"].ToString()!);
break;
}
// Emit domain events from CDC
var record = op == "d"
? change["before"].Deserialize<Dictionary<string, object>>()!
: change["after"].Deserialize<Dictionary<string, object>>()!;
var eventType = op == "d" ? "OrderDeleted" : "OrderUpdated";
await _producer.SendEventAsync("order-events", record["id"].ToString()!, new
{
eventType,
order = record,
timestamp = change.ContainsKey("ts_ms") ? change["ts_ms"].GetInt64() : 0,
});
}
private async Task SyncReadModelAsync(string table, Dictionary<string, object> record)
{
await _db.ExecuteAsync(
"INSERT INTO order_read_model (id, data) VALUES (@Id, @Data) ON CONFLICT (id) DO UPDATE SET data = @Data",
new { Id = record["id"], Data = JsonSerializer.Serialize(record) });
}
private async Task DeleteFromReadModelAsync(string table, string id) =>
await _db.ExecuteAsync("DELETE FROM order_read_model WHERE id = @Id", new { Id = id });
}Benefits:
- รักษา eventual consistency ข้ามระบบ
- ศูนย์ผลกระทบบนฐานข้อมูลต้นฉบับ
- ประวัติการเปลี่ยนแปลงที่สมบูรณ์
- เปิดใจให้กับการซิงค์โครไนซ์ข้อมูลไปยังคลังข้อมูลวิเคราะห์
Monitoring: Consumer Lag, Throughput, และ Partition Health
Observability วิกฤตสำหรับระบบ Kafka ของผลิตภาพ:
import pino from 'pino';
const logger = pino();
class KafkaMonitoring {
async monitorConsumerLag(
groupId: string,
topic: string,
pollIntervalMs: number = 30000
): Promise<void> {
setInterval(async () => {
const admin = kafka.admin();
await admin.connect();
try {
const offsets = await admin.fetchOffsets({ groupId });
const topicMetadata = await admin.fetchTopicMetadata({ topics: [topic] });
for (const partition of topicMetadata[0].partitions) {
const consumerOffset = offsets
.filter(
(o) => o.topic === topic && o.partition === partition.partitionErrorCode
)
.map((o) => parseInt(o.offset))[0] || 0;
const partitionLogSize = await this.getPartitionLogSize(
admin,
topic,
partition.partitionErrorCode
);
const lag = partitionLogSize - consumerOffset;
logger.info(
{
groupId,
topic,
partition: partition.partitionErrorCode,
consumerOffset,
logSize: partitionLogSize,
lag,
},
'Consumer lag metric'
);
// Alert if lag exceeds threshold
if (lag > 10000) {
logger.warn(
{
groupId,
topic,
partition: partition.partitionErrorCode,
lag,
},
'High consumer lag detected'
);
}
}
} finally {
await admin.disconnect();
}
}, pollIntervalMs);
}
async monitorThroughput(topic: string): Promise<void> {
let lastOffset = 0;
let lastTime = Date.now();
setInterval(async () => {
const admin = kafka.admin();
await admin.connect();
try {
const topicMetadata = await admin.fetchTopicMetadata({ topics: [topic] });
let totalOffset = 0;
for (const partition of topicMetadata[0].partitions) {
const logSize = await this.getPartitionLogSize(
admin,
topic,
partition.partitionErrorCode
);
totalOffset += logSize;
}
const currentTime = Date.now();
const timeDiff = (currentTime - lastTime) / 1000; // seconds
const offsetDiff = totalOffset - lastOffset;
const throughput = offsetDiff / timeDiff;
logger.info(
{
topic,
totalOffset,
throughput: `${Math.round(throughput)} events/sec`,
},
'Topic throughput'
);
lastOffset = totalOffset;
lastTime = currentTime;
} finally {
await admin.disconnect();
}
}, 60000); // Every minute
}
private async getPartitionLogSize(
admin: any,
topic: string,
partition: number
): Promise<number> {
const offsets = await admin.fetchTopicOffsets(topic);
const partitionOffsets = offsets.filter((o) => o.partition === partition);
return partitionOffsets.length > 0 ? parseInt(partitionOffsets[0].high) : 0;
}
}
// Use Prometheus for metrics export
import { register, Gauge, Counter } from 'prom-client';
const consumerLagGauge = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag in number of messages',
labelNames: ['group', 'topic', 'partition'],
registers: [register],
});
const throughputCounter = new Counter({
name: 'kafka_messages_processed_total',
help: 'Total messages processed',
labelNames: ['topic', 'partition'],
registers: [register],
});// Spring Boot with Micrometer + Prometheus
import org.apache.kafka.clients.admin.*;
import io.micrometer.core.instrument.*;
import org.springframework.scheduling.annotation.Scheduled;
@Service
public class KafkaMonitoringService {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
private static final Logger log = LoggerFactory.getLogger(KafkaMonitoringService.class);
@Scheduled(fixedDelay = 30000)
public void monitorConsumerLag() throws Exception {
String groupId = "order-processors";
String topic = "orders";
ListConsumerGroupOffsetsResult offsetsResult = adminClient
.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
consumerOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))
).all().get();
for (var entry : consumerOffsets.entrySet()) {
var tp = entry.getKey();
long consumerOffset = entry.getValue().offset();
long logSize = endOffsets.get(tp).offset();
long lag = logSize - consumerOffset;
log.info("Consumer lag — group: {}, topic: {}, partition: {}, lag: {}",
groupId, tp.topic(), tp.partition(), lag);
if (lag > 10000) {
log.warn("High consumer lag detected — partition: {}, lag: {}",
tp.partition(), lag);
}
// Export to Prometheus via Micrometer
Gauge.builder("kafka_consumer_lag", () -> (double) lag)
.tag("group", groupId)
.tag("topic", tp.topic())
.tag("partition", String.valueOf(tp.partition()))
.register(meterRegistry);
}
}
@Scheduled(fixedDelay = 60000)
public void monitorThroughput() throws Exception {
// Compare end offsets over time to calculate throughput
String topic = "orders";
var partitions = adminClient.describeTopics(List.of(topic))
.allTopicNames().get().get(topic).partitions();
var tps = partitions.stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
var offsets = adminClient.listOffsets(
tps.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))
).all().get();
long total = offsets.values().stream().mapToLong(r -> r.offset()).sum();
log.info("Topic {} total offset: {}", topic, total);
}
}# pip install aiokafka prometheus-client
from aiokafka.admin import AIOKafkaAdminClient
from prometheus_client import Gauge, Counter, start_http_server
import asyncio
import logging
logger = logging.getLogger(__name__)
consumer_lag_gauge = Gauge(
"kafka_consumer_lag",
"Consumer lag in number of messages",
["group", "topic", "partition"],
)
throughput_counter = Counter(
"kafka_messages_processed_total",
"Total messages processed",
["topic", "partition"],
)
class KafkaMonitoringService:
def __init__(self, brokers: list[str]):
self.brokers = brokers
async def monitor_consumer_lag(
self, group_id: str, topic: str, poll_interval_s: int = 30
) -> None:
while True:
admin = AIOKafkaAdminClient(bootstrap_servers=",".join(self.brokers))
await admin.start()
try:
consumer_offsets = await admin.list_consumer_group_offsets(group_id)
end_offsets = await admin.list_offsets({
tp: OffsetSpec.LATEST for tp in consumer_offsets
})
for tp, offset_meta in consumer_offsets.items():
consumer_offset = offset_meta.offset
log_size = end_offsets[tp].offset
lag = log_size - consumer_offset
logger.info("Consumer lag — group: %s, topic: %s, partition: %d, lag: %d",
group_id, tp.topic, tp.partition, lag)
consumer_lag_gauge.labels(
group=group_id, topic=tp.topic, partition=str(tp.partition)
).set(lag)
if lag > 10000:
logger.warning("High consumer lag detected — partition: %d, lag: %d",
tp.partition, lag)
finally:
await admin.close()
await asyncio.sleep(poll_interval_s)
async def monitor_throughput(self, topic: str) -> None:
last_total = 0
while True:
admin = AIOKafkaAdminClient(bootstrap_servers=",".join(self.brokers))
await admin.start()
try:
offsets = await admin.list_offsets({})
total = sum(r.offset for r in offsets.values())
throughput = (total - last_total) / 60
logger.info("Topic %s throughput: %.1f events/sec", topic, throughput)
last_total = total
finally:
await admin.close()
await asyncio.sleep(60)// NuGet: Confluent.Kafka, prometheus-net
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Prometheus;
using Microsoft.Extensions.Hosting;
public class KafkaMonitoringService : BackgroundService
{
private readonly IAdminClient _adminClient;
private readonly ILogger<KafkaMonitoringService> _logger;
private static readonly Gauge ConsumerLagGauge = Metrics.CreateGauge(
"kafka_consumer_lag",
"Consumer lag in number of messages",
new GaugeConfiguration { LabelNames = new[] { "group", "topic", "partition" } });
private static readonly Counter ThroughputCounter = Metrics.CreateCounter(
"kafka_messages_processed_total",
"Total messages processed",
new CounterConfiguration { LabelNames = new[] { "topic", "partition" } });
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await MonitorConsumerLagAsync("order-processors", "orders");
await Task.Delay(30_000, ct);
}
}
private async Task MonitorConsumerLagAsync(string groupId, string topic)
{
try
{
var groupOffsets = await _adminClient.ListConsumerGroupOffsetsAsync(
new[] { new ConsumerGroupTopicPartitions(groupId) });
foreach (var tpo in groupOffsets.First().Partitions)
{
var endOffsets = await _adminClient.ListOffsetsAsync(
new[] { new TopicPartitionOffsetSpec(tpo.TopicPartition, OffsetSpec.Latest()) },
new ListOffsetsOptions());
long consumerOffset = tpo.Offset.Value;
long logSize = endOffsets.First().Offset.Value;
long lag = logSize - consumerOffset;
_logger.LogInformation(
"Consumer lag — group: {Group}, partition: {Partition}, lag: {Lag}",
groupId, tpo.Partition.Value, lag);
ConsumerLagGauge
.WithLabels(groupId, topic, tpo.Partition.Value.ToString())
.Set(lag);
if (lag > 10000)
_logger.LogWarning("High consumer lag on partition {P}: {Lag}",
tpo.Partition.Value, lag);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error monitoring consumer lag");
}
}
}Key metrics:
- Consumer lag: ระยะห่างระหว่าง latest offset และ committed offset
- Throughput: เหตุการณ์ประมวลผลต่อวินาที
- Rebalancing time: ระยะเวลา partition reassignment
- Partition health: ของผู้นำ/ISR status การใช้ดิสก์
- Broker metrics: CPU, memory, network I/O
Kafka กับ Message Queues Comparison
| Aspect | Kafka | RabbitMQ | AWS SQS |
|---|---|---|---|
| Model | Append-only log | Message queue | Message queue |
| Durability | Persistent by default | Configurable | Persistent |
| Message retention | Configurable duration | Until consumed | 14 days max |
| Replay capability | Native (rewind offset) | No (messages deleted) | No |
| Ordering | Per-partition guarantee | Per-queue guarantee | Best effort |
| Throughput | Millions msg/sec | Hundreds of thousands | High but eventually consistent |
| Latency | Milliseconds | Milliseconds | Milliseconds |
| Scaling | Partitioning | Clustering | Auto-scaled |
| Consumer model | Consumer groups | Competing consumers | Visibility timeout |
| Multi-tenancy | Topics + ACLs | Virtual hosts | Separate queues |
| Use case | Event streaming, analytics | Task distribution, pub/sub | Decoupled microservices |
เลือก Kafka สำหรับ:
- Event streaming และ sourcing
- Analytics และ data warehousing
- Multi-consumer replayed streams
- High-throughput systems
- Time-series data
เลือก message queues สำหรับ:
- Simple request/response patterns
- Task distribution และ job processing
- Competing consumer model
- Lower operational complexity
Production Deployment Checklist
Cluster Sizing
// Sizing formula considerations:
// 1. Throughput: Events per second to capacity
const requiredPartitions = Math.ceil(
targetThroughput / MAX_PRODUCER_THROUGHPUT_PER_PARTITION
);
const requiredBrokers = Math.ceil(
requiredPartitions / (PARTITIONS_PER_BROKER || 4000)
);
// 2. Storage: Retention period × daily volume
const dailyVolume = targetThroughput * 86400; // events/day
const dataSize = dailyVolume * AVERAGE_MESSAGE_SIZE; // bytes
const retentionDays = 30;
const requiredStorage = dataSize * retentionDays * REPLICATION_FACTOR;
// 3. Memory: OS page cache for hot partitions
// Rule of thumb: ~1GB per 100k partitions (working set)
// Example 3-broker cluster:
// - Throughput: 100k events/sec
// - Retention: 30 days
// - Replication: 3
// - Message size: 1KB average
// → 30 partitions, 100GB storage per broker, 64GB memory// Sizing formula in Java
import java.math.BigDecimal;
public class KafkaSizingCalculator {
// 1. Throughput → partition/broker count
public static int requiredPartitions(long targetThroughput, long maxThroughputPerPartition) {
return (int) Math.ceil((double) targetThroughput / maxThroughputPerPartition);
}
public static int requiredBrokers(int partitions, int partitionsPerBroker) {
return (int) Math.ceil((double) partitions / partitionsPerBroker);
}
// 2. Storage: retention period × daily volume
public static long requiredStorageBytes(
long targetThroughput, long avgMessageSizeBytes,
int retentionDays, int replicationFactor) {
long dailyVolume = targetThroughput * 86_400L;
long dataSize = dailyVolume * avgMessageSizeBytes;
return dataSize * retentionDays * replicationFactor;
}
// Example 3-broker cluster:
// targetThroughput=100_000, partitionsPerBroker=4_000
// → 25 partitions, 3 brokers, ~100GB storage each, 64GB memory
}# Sizing formula in Python
import math
def required_partitions(target_throughput: int, max_throughput_per_partition: int) -> int:
"""1. Throughput → partition count."""
return math.ceil(target_throughput / max_throughput_per_partition)
def required_brokers(partitions: int, partitions_per_broker: int = 4000) -> int:
return math.ceil(partitions / partitions_per_broker)
def required_storage_bytes(
target_throughput: int,
avg_message_size_bytes: int,
retention_days: int = 30,
replication_factor: int = 3,
) -> int:
"""2. Storage: retention period × daily volume."""
daily_volume = target_throughput * 86_400
data_size = daily_volume * avg_message_size_bytes
return data_size * retention_days * replication_factor
# Example 3-broker cluster:
# target=100_000 events/sec, 1KB message, 30-day retention
# → ~30 partitions, 3 brokers, ~100GB storage per broker, 64GB memory// Sizing formula in C#
public static class KafkaSizingCalculator
{
// 1. Throughput → partition/broker count
public static int RequiredPartitions(long targetThroughput, long maxThroughputPerPartition) =>
(int)Math.Ceiling((double)targetThroughput / maxThroughputPerPartition);
public static int RequiredBrokers(int partitions, int partitionsPerBroker = 4000) =>
(int)Math.Ceiling((double)partitions / partitionsPerBroker);
// 2. Storage: retention period × daily volume
public static long RequiredStorageBytes(
long targetThroughput,
long avgMessageSizeBytes,
int retentionDays = 30,
int replicationFactor = 3)
{
long dailyVolume = targetThroughput * 86_400L;
long dataSize = dailyVolume * avgMessageSizeBytes;
return dataSize * retentionDays * replicationFactor;
}
// Example 3-broker cluster:
// targetThroughput=100_000, 1KB message, 30-day retention
// → ~30 partitions, 3 brokers, ~100GB storage each, 64GB memory
}Retention และ Cleanup Policies
// Topic configuration
const topicConfig = {
name: 'orders',
numPartitions: 30,
replicationFactor: 3,
configEntries: [
// Retention by time
{ name: 'retention.ms', value: '2592000000' }, // 30 days
// Retention by size
{ name: 'retention.bytes', value: '1099511627776' }, // 1TB
// Log cleanup policy
{ name: 'cleanup.policy', value: 'delete' }, // or 'compact'
// Log compaction ratio (if cleanup.policy=compact)
{ name: 'min.compaction.lag.ms', value: '86400000' }, // 1 day
{ name: 'delete.retention.ms', value: '86400000' },
// Segment size (affects compaction frequency)
{ name: 'segment.ms', value: '86400000' }, // 1 day
{ name: 'segment.bytes', value: '1073741824' }, // 1GB
],
};// Topic creation with retention policy via Spring Kafka / AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.kafka.core.KafkaAdmin;
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders")
.partitions(30)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "2592000000") // 30 days
.config(TopicConfig.RETENTION_BYTES_CONFIG, "1099511627776") // 1TB
.config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
.config(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "86400000")
.config(TopicConfig.DELETE_RETENTION_MS_CONFIG, "86400000")
.config(TopicConfig.SEGMENT_MS_CONFIG, "86400000") // 1 day
.config(TopicConfig.SEGMENT_BYTES_CONFIG, "1073741824") // 1GB
.build();
}# Topic creation with retention policy via confluent_kafka AdminClient
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource, ConfigEntry
admin = AdminClient({"bootstrap.servers": "kafka-1:9092"})
new_topic = NewTopic(
"orders",
num_partitions=30,
replication_factor=3,
config={
"retention.ms": "2592000000", # 30 days
"retention.bytes": "1099511627776", # 1TB
"cleanup.policy": "delete", # or "compact"
"min.compaction.lag.ms": "86400000",
"delete.retention.ms": "86400000",
"segment.ms": "86400000", # 1 day
"segment.bytes": "1073741824", # 1GB
},
)
admin.create_topics([new_topic])// Topic creation with retention policy via Confluent.Kafka AdminClient
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using var adminClient = new AdminClientBuilder(
new AdminClientConfig { BootstrapServers = "kafka-1:9092" }).Build();
await adminClient.CreateTopicsAsync(new[]
{
new TopicSpecification
{
Name = "orders",
NumPartitions = 30,
ReplicationFactor = 3,
Configs = new Dictionary<string, string>
{
["retention.ms"] = "2592000000", // 30 days
["retention.bytes"] = "1099511627776", // 1TB
["cleanup.policy"] = "delete",
["min.compaction.lag.ms"] = "86400000",
["delete.retention.ms"] = "86400000",
["segment.ms"] = "86400000", // 1 day
["segment.bytes"] = "1073741824", // 1GB
},
},
});Compression และ Tuning
// Producer tuning for throughput
const producerConfig = {
compression: CompressionTypes.Snappy, // or Gzip, LZ4, Zstd
linger: 10, // Wait 10ms to batch messages
batchSize: 16384, // 16KB per batch
maxInFlightRequests: 5, // Pipeline requests
acks: 'all', // Full durability
requestTimeout: 30000,
retries: 5,
};
// Broker tuning
const brokerConfig = {
'num.network.threads': 8, // Network threads
'num.io.threads': 8, // I/O threads
'socket.send.buffer.bytes': 102400, // 100KB
'socket.receive.buffer.bytes': 102400,
'socket.request.max.bytes': 104857600, // 100MB
'log.flush.interval.messages': 10000,
'log.flush.interval.ms': 1000,
};// Spring Kafka: producer tuning for throughput
@Bean
public ProducerFactory<String, String> tunedProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(ProducerConfig.LINGER_MS_CONFIG, 10); // batch window
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
config.put(ProducerConfig.RETRIES_CONFIG, 5);
return new DefaultKafkaProducerFactory<>(config);
}
// Broker tuning is set in server.properties (not in Java code):
// num.network.threads=8
// num.io.threads=8
// socket.send.buffer.bytes=102400
// socket.receive.buffer.bytes=102400
// socket.request.max.bytes=104857600
// log.flush.interval.messages=10000
// log.flush.interval.ms=1000# aiokafka: producer tuning for throughput
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
bootstrap_servers="kafka-1:9092",
compression_type="snappy", # or gzip, lz4, zstd
linger_ms=10, # wait 10ms to batch
max_batch_size=16384, # 16KB per batch
max_in_flight_requests_per_connection=5,
acks="all", # full durability
request_timeout_ms=30000,
retry_backoff_ms=100,
)
# Broker tuning is set in server.properties (not in Python code):
# num.network.threads=8
# num.io.threads=8
# socket.send.buffer.bytes=102400
# socket.receive.buffer.bytes=102400
# socket.request.max.bytes=104857600
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000// Confluent.Kafka: producer tuning for throughput
var producerConfig = new ProducerConfig
{
CompressionType = CompressionType.Snappy, // or Gzip, Lz4, Zstd
LingerMs = 10, // wait 10ms to batch
BatchSize = 16384, // 16KB per batch
MaxInFlight = 5,
Acks = Acks.All, // full durability
MessageTimeoutMs = 30000,
// Retries handled via EnableIdempotence or retry policy
};
// Broker tuning is set in server.properties (not in C# code):
// num.network.threads=8
// num.io.threads=8
// socket.send.buffer.bytes=102400
// socket.receive.buffer.bytes=102400
// socket.request.max.bytes=104857600
// log.flush.interval.messages=10000
// log.flush.interval.ms=1000Monitoring และ Alerting
// Critical alerts
const alerts = [
{
name: 'HighConsumerLag',
condition: 'consumer_lag > 100000',
severity: 'critical',
action: 'Page on-call engineer',
},
{
name: 'BrokerDown',
condition: 'broker_up == 0',
severity: 'critical',
action: 'Page on-call engineer',
},
{
name: 'ISRShrinking',
condition: 'isr_size < replication_factor',
severity: 'warning',
action: 'Check broker health',
},
{
name: 'ProducerErrors',
condition: 'producer_errors_total > 10',
severity: 'warning',
action: 'Investigate producer failures',
},
{
name: 'HighDiskUsage',
condition: 'disk_usage_percent > 80',
severity: 'warning',
action: 'Scale cluster or increase retention',
},
];// Alert definitions as Java records — wire into your alerting system
public record AlertDefinition(
String name, String condition, String severity, String action) {}
public class KafkaAlertDefinitions {
public static final List<AlertDefinition> ALERTS = List.of(
new AlertDefinition("HighConsumerLag",
"consumer_lag > 100000", "critical", "Page on-call engineer"),
new AlertDefinition("BrokerDown",
"broker_up == 0", "critical", "Page on-call engineer"),
new AlertDefinition("ISRShrinking",
"isr_size < replication_factor", "warning", "Check broker health"),
new AlertDefinition("ProducerErrors",
"producer_errors_total > 10", "warning", "Investigate producer failures"),
new AlertDefinition("HighDiskUsage",
"disk_usage_percent > 80", "warning", "Scale cluster or increase retention")
);
}# Alert definitions — wire into your alerting system (Prometheus AlertManager, PagerDuty, etc.)
from dataclasses import dataclass
@dataclass
class AlertDefinition:
name: str
condition: str
severity: str
action: str
KAFKA_ALERTS = [
AlertDefinition("HighConsumerLag",
"consumer_lag > 100000", "critical", "Page on-call engineer"),
AlertDefinition("BrokerDown",
"broker_up == 0", "critical", "Page on-call engineer"),
AlertDefinition("ISRShrinking",
"isr_size < replication_factor", "warning", "Check broker health"),
AlertDefinition("ProducerErrors",
"producer_errors_total > 10", "warning", "Investigate producer failures"),
AlertDefinition("HighDiskUsage",
"disk_usage_percent > 80", "warning", "Scale cluster or increase retention"),
]// Alert definitions — wire into your alerting system (Prometheus AlertManager, PagerDuty, etc.)
public record AlertDefinition(
string Name, string Condition, string Severity, string Action);
public static class KafkaAlertDefinitions
{
public static readonly IReadOnlyList<AlertDefinition> Alerts =
[
new("HighConsumerLag",
"consumer_lag > 100000", "critical", "Page on-call engineer"),
new("BrokerDown",
"broker_up == 0", "critical", "Page on-call engineer"),
new("ISRShrinking",
"isr_size < replication_factor", "warning", "Check broker health"),
new("ProducerErrors",
"producer_errors_total > 10", "warning", "Investigate producer failures"),
new("HighDiskUsage",
"disk_usage_percent > 80", "warning", "Scale cluster or increase retention"),
];
}Security
// TLS/SSL configuration
const kafkaConfig = {
ssl: {
rejectUnauthorized: false, // Verify certificates
ca: [readFileSync('/path/to/ca-cert.pem', 'utf-8')],
cert: readFileSync('/path/to/client-cert.pem', 'utf-8'),
key: readFileSync('/path/to/client-key.pem', 'utf-8'),
},
sasl: {
mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
username: process.env.KAFKA_USER,
password: process.env.KAFKA_PASSWORD,
},
};
// ACL setup (broker-side)
const acls = [
{
resourceType: 'TOPIC',
resourceName: 'orders',
principal: 'User:order-service',
operation: 'WRITE',
permissionType: 'ALLOW',
},
{
resourceType: 'TOPIC',
resourceName: 'orders',
principal: 'User:order-processor',
operation: 'READ',
permissionType: 'ALLOW',
},
];// Spring Kafka: TLS/SSL + SASL configuration
@Bean
public ProducerFactory<String, String> secureProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, System.getenv("TRUSTSTORE_PASS"));
config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore.jks");
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KEYSTORE_PASS"));
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + System.getenv("KAFKA_USER") + "\" " +
"password=\"" + System.getenv("KAFKA_PASSWORD") + "\";");
return new DefaultKafkaProducerFactory<>(config);
}
// ACL creation via AdminClient
public void createAcls(AdminClient adminClient) throws Exception {
var writeAcl = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
new AccessControlEntry("User:order-service", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
var readAcl = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
new AccessControlEntry("User:order-processor", "*", AclOperation.READ, AclPermissionType.ALLOW));
adminClient.createAcls(List.of(writeAcl, readAcl)).all().get();
}# aiokafka: TLS/SSL + SASL configuration
import ssl
import os
from aiokafka import AIOKafkaProducer
ssl_context = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
ssl_context.load_cert_chain(
certfile="/path/to/client-cert.pem",
keyfile="/path/to/client-key.pem",
)
producer = AIOKafkaProducer(
bootstrap_servers="kafka-1:9092",
security_protocol="SASL_SSL",
ssl_context=ssl_context,
sasl_mechanism="PLAIN", # or SCRAM-SHA-256, SCRAM-SHA-512
sasl_plain_username=os.getenv("KAFKA_USER", ""),
sasl_plain_password=os.getenv("KAFKA_PASSWORD", ""),
)
# ACL creation via confluent_kafka AdminClient
from confluent_kafka.admin import AdminClient, AclBinding, AclOperation, AclPermissionType, ResourceType, ResourcePatternType
admin = AdminClient({"bootstrap.servers": "kafka-1:9092"})
acls = [
AclBinding(ResourceType.TOPIC, "orders", ResourcePatternType.LITERAL,
"User:order-service", "*", AclOperation.WRITE, AclPermissionType.ALLOW),
AclBinding(ResourceType.TOPIC, "orders", ResourcePatternType.LITERAL,
"User:order-processor", "*", AclOperation.READ, AclPermissionType.ALLOW),
]
admin.create_acls(acls)// Confluent.Kafka: TLS/SSL + SASL configuration
var secureProducerConfig = new ProducerConfig
{
BootstrapServers = "kafka-1:9092",
SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = "/path/to/ca-cert.pem",
SslCertificateLocation = "/path/to/client-cert.pem",
SslKeyLocation = "/path/to/client-key.pem",
SaslMechanism = SaslMechanism.Plain, // or ScramSha256, ScramSha512
SaslUsername = Environment.GetEnvironmentVariable("KAFKA_USER") ?? "",
SaslPassword = Environment.GetEnvironmentVariable("KAFKA_PASSWORD") ?? "",
};
// ACL creation via AdminClient
using var adminClient = new AdminClientBuilder(
new AdminClientConfig { BootstrapServers = "kafka-1:9092" }).Build();
await adminClient.CreateAclsAsync(new[]
{
new AclBinding(
new ResourceSpecification(ResourceType.Topic, "orders", ResourcePatternType.Literal),
new AccessControlEntry("User:order-service", "*", AclOperation.Write, AclPermissionType.Allow)),
new AclBinding(
new ResourceSpecification(ResourceType.Topic, "orders", ResourcePatternType.Literal),
new AccessControlEntry("User:order-processor", "*", AclOperation.Read, AclPermissionType.Allow)),
});Operations และ Maintenance
Regular tasks:
- Monitor consumer lag และ throughput
- Check broker และ ISR health
- Review log compaction progress
- Test disaster recovery (broker failure scenarios)
- Rotate credentials periodically
- Update Kafka version safely (rolling upgrade)
- Archive old data to cold storage
- Plan capacity for growth
Incident response:
- Consumer lag spike: Check consumer health เพิ่ม parallelism
- Message loss: Investigate producer acks และ replication
- High latency: Check broker CPU/memory, network I/O
- Rebalancing storms: Increase session timeout fix consumer crashes
- Disk full: Scale cluster หรือ increase retention window
Conclusion
Kafka ได้เปลี่ยนวิธีการของศักยภาพสร้างระบบที่ปรับขนาดได้และขับเคลื่อนด้วยเหตุการณ์ โมเดล append-only log เปิดใจให้กับรูปแบบที่เป็นไปไม่ได้กับ message queues แบบดั้งเดิม — จากการ event sourcing ถึง CQRS ถึง streaming analytics
Key takeaways:
- Think in events: Event streams เป็น abstraction ส่วน time-series data
- Plan for scale: Design partition strategy และ replication upfront
- Embrace idempotency: Handle at-least-once semantics ด้วย idempotent processing
- Instrument thoroughly: Consumer lag และ broker health เป็น essential visibility
- Practice operations: Test failure scenarios และ disaster recovery regularly
- Schema evolve safely: Use Schema Registry สำหรับ schema versioning
- Measure และ optimize: Profile producers และ consumers สำหรับ bottlenecks
การลงทุนในความเข้าใจ Kafka จ่ายเงินปันผลเมื่อสร้างระบบที่ต้องประมวลผลเหตุการณ์นับล้านอย่างน่าเชื่อถือและปรับขนาดเพื่อจัดการการเติบโต