การเจาะลึก: การสื่อสารของคิว (Message Queue)
Message queues คือกระดูกสันหลังของระบบแบบกระจายสมัยใหม่ พวกเขาแยกบริการ บัฟเฟอร์งาน และรับประกันการจัดส่งข้อความที่เชื่อถือได้ในโครงสร้างพื้นฐานของคุณ คำแนะนำนี้ครอบคลุมแนวคิด การใช้งาน และรูปแบบการใช้งานจริงที่คุณต้องการเพื่อสร้างระบบการส่งข้อความที่มีความเสถียร
Message Queues คืออะไร
Message queues ให้การสื่อสารแบบ asynchronous ระหว่างบริการต่างๆ แทนที่บริการ A จะเรียกบริการ B โดยตรง (synchronous) บริการ A จะวางข้อความลงในคิว บริการ B จะบริโภคข้อความนั้นเมื่อพร้อม สิ่งนี้สร้างการแยก temporal decoupling — บริการไม่ต้องทำงานในเวลาเดียวกัน — และการแยก spatial decoupling — บริการไม่ต้องรู้เกี่ยวกับกันและกัน
The Point-to-Point Model
ในโมเดลที่ง่ายที่สุด producer จะส่งข้อความไปยังคิว และ consumers จะดึงข้อความจากคิวนั้น แต่ละข้อความจะถูกประมวลผลโดย consumer เพียงคนเดียวเท่านั้น สิ่งนี้แตกต่างจาก pub/sub ซึ่งข้อความหนึ่งไปยังผู้บอกรับหลายคน
sequenceDiagram
participant Producer
participant Queue
participant Consumer
Producer->>Queue: Send message ("process_order")
Queue->>Queue: Store message in buffer
Note over Queue: Message persisted to disk
Queue-->>Producer: Acknowledgment
Consumer->>Queue: Poll for messages
Queue-->>Consumer: Return message
Consumer->>Consumer: Process message
Consumer->>Queue: Send acknowledgment
Queue->>Queue: Delete message
Delivery Guarantees
ระบบ message queue มีสามระดับของการรับประกันการจัดส่ง:
At-Most-Once: ข้อความอาจจะหลุดหาย แต่จะไม่ถูกจัดส่งสองครั้ง ความหมาย Fire-and-forget
At-Least-Once: ข้อความจะไม่หลุดหาย แต่อาจถูกจัดส่งหลายครั้ง ต้องใช้ consumers ที่ idempotent
Exactly-Once: แต่ละข้อความจะถูกจัดส่งแม่นยำหนึ่งครั้ง การรับประกันที่แข็งแกร่งที่สุด แต่มีค่าใช้จ่ายมากที่สุด
Core Principles
- Decoupling: Producers และ consumers ทำงานอย่างอิสระ ลดการขึ้นต่อกันที่แน่น
- Buffering: Queues ดูดซับการเพิ่มขึ้นของปริมาณการจราจร ป้องกันความล้มเหลวแบบลำดับ เมื่อ consumers ล่าช้า
- Persistence: ข้อความจะถูกจัดเก็บอย่างถาวร (โดยปกติบนดิสก์) เพื่อให้ได้รับความเสียหายจากความล้มเหลวของระบบ
- Ordering: ระบบจำนวนมากรับประกันการสั่งซื้อ FIFO ภายในคิวเดียวหรือพาร์ติชั่นเดียว
- Acknowledgment: Consumers ยืนยันการประมวลผลอย่างชัดแจ้ง ข้อความที่ไม่ได้รับการยืนยันจะถูกลองใหม่
- Backpressure: กลไกเพื่อป้องกัน producers จากการบดขยี้ consumers ช้า
RabbitMQ: Exchange-Based Routing
RabbitMQ ใช้ exchanges เพื่อกำหนดเส้นทางข้อความไปยังคิวตามการกำหนดเส้นทางคีย์และการสัมพันธ์ สิ่งนี้แยก message producers จากโครงสร้าง queue topology
Core Components
Exchange: รับข้อความจาก producers และกำหนดเส้นทางไปยังคิวที่ผูกไว้ สามประเภท: Direct (routing key match), Topic (pattern match), Fanout (broadcast)
Queue: จัดเก็บข้อความอย่างถาวรจนกว่าจะมีการบริโภค
Binding: แมปจาก exchange ไปยังคิวหนึ่งหรือมากกว่าหนึ่งคิวพร้อมกับรูปแบบ routing key
Routing Key: สตริงที่ producers แนบมากับข้อความ exchanges ใช้เพื่อตัดสินใจว่าคิวใดรับข้อความ
RabbitMQ Producer Example
import amqp from 'amqplib';
interface OrderMessage {
orderId: string;
customerId: string;
total: number;
timestamp: Date;
}
class RabbitMQProducer {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string = 'amqp://localhost'): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Declare exchange (idempotent)
const exchangeName = 'orders';
await this.channel.assertExchange(exchangeName, 'topic', { durable: true });
}
async publishOrder(message: OrderMessage): Promise<boolean> {
if (!this.channel) {
throw new Error('Producer not connected');
}
const exchangeName = 'orders';
const routingKey = `orders.${message.customerId}.created`;
const messageBuffer = Buffer.from(JSON.stringify(message));
// Publish with confirmation
return this.channel.publish(
exchangeName,
routingKey,
messageBuffer,
{
persistent: true,
contentType: 'application/json',
timestamp: Date.now(),
// Generate unique message ID for idempotency
messageId: `${message.orderId}-${Date.now()}`,
}
);
}
async close(): Promise<void> {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// Usage
const producer = new RabbitMQProducer();
await producer.connect('amqp://guest:guest@localhost');
await producer.publishOrder({
orderId: 'ORD-12345',
customerId: 'CUST-789',
total: 149.99,
timestamp: new Date(),
});
await producer.close();// RabbitMQProducer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class RabbitMQProducer {
private Connection connection;
private Channel channel;
private static final ObjectMapper mapper = new ObjectMapper();
public record OrderMessage(String orderId, String customerId, double total, Date timestamp) {}
public void connect(String url) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(url);
this.connection = factory.newConnection();
this.channel = connection.createChannel();
// Declare exchange (idempotent)
channel.exchangeDeclare("orders", "topic", true);
}
public boolean publishOrder(OrderMessage message) throws IOException {
if (channel == null) throw new IllegalStateException("Producer not connected");
String exchangeName = "orders";
String routingKey = "orders." + message.customerId() + ".created";
byte[] body = mapper.writeValueAsBytes(message);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // persistent
.contentType("application/json")
.timestamp(new Date())
.messageId(message.orderId() + "-" + System.currentTimeMillis())
.build();
channel.basicPublish(exchangeName, routingKey, props, body);
return true;
}
public void close() throws IOException, TimeoutException {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
// Usage
public static void main(String[] args) throws Exception {
RabbitMQProducer producer = new RabbitMQProducer();
producer.connect("amqp://guest:guest@localhost");
producer.publishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, new Date()));
producer.close();
}
}# rabbitmq_producer.py
import aio_pika
import json
from datetime import datetime
from dataclasses import dataclass, asdict
@dataclass
class OrderMessage:
order_id: str
customer_id: str
total: float
timestamp: str
class RabbitMQProducer:
def __init__(self):
self.connection = None
self.channel = None
async def connect(self, url: str = "amqp://localhost") -> None:
self.connection = await aio_pika.connect_robust(url)
self.channel = await self.connection.channel()
# Declare exchange (idempotent)
self.exchange = await self.channel.declare_exchange(
"orders", aio_pika.ExchangeType.TOPIC, durable=True
)
async def publish_order(self, message: OrderMessage) -> None:
if not self.channel:
raise RuntimeError("Producer not connected")
routing_key = f"orders.{message.customer_id}.created"
body = json.dumps(asdict(message)).encode()
msg = aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type="application/json",
message_id=f"{message.order_id}-{int(datetime.now().timestamp() * 1000)}",
)
await self.exchange.publish(msg, routing_key=routing_key)
async def close(self) -> None:
if self.connection:
await self.connection.close()
# Usage
import asyncio
async def main():
producer = RabbitMQProducer()
await producer.connect("amqp://guest:guest@localhost")
await producer.publish_order(OrderMessage(
order_id="ORD-12345",
customer_id="CUST-789",
total=149.99,
timestamp=datetime.now().isoformat(),
))
await producer.close()
asyncio.run(main())// RabbitMQProducer.cs
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
public record OrderMessage(string OrderId, string CustomerId, double Total, DateTime Timestamp);
public class RabbitMQProducer : IDisposable
{
private IConnection? _connection;
private IModel? _channel;
public void Connect(string url = "amqp://localhost")
{
var factory = new ConnectionFactory { Uri = new Uri(url) };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// Declare exchange (idempotent)
_channel.ExchangeDeclare("orders", ExchangeType.Topic, durable: true);
}
public bool PublishOrder(OrderMessage message)
{
if (_channel == null) throw new InvalidOperationException("Producer not connected");
var routingKey = $"orders.{message.CustomerId}.created";
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
var props = _channel.CreateBasicProperties();
props.Persistent = true;
props.ContentType = "application/json";
props.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
props.MessageId = $"{message.OrderId}-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
_channel.BasicPublish("orders", routingKey, props, body);
return true;
}
public void Dispose()
{
_channel?.Close();
_connection?.Close();
}
}
// Usage
using var producer = new RabbitMQProducer();
producer.Connect("amqp://guest:guest@localhost");
producer.PublishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, DateTime.UtcNow));RabbitMQ Consumer with Acknowledgment
import amqp, { Message } from 'amqplib';
interface OrderMessage {
orderId: string;
customerId: string;
total: number;
timestamp: Date;
}
interface ProcessorConfig {
exchangeName: string;
queueName: string;
routingPatterns: string[];
prefetchCount: number;
maxRetries: number;
}
class RabbitMQConsumer {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
private config: ProcessorConfig;
constructor(config: ProcessorConfig) {
this.config = config;
}
async connect(url: string = 'amqp://localhost'): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Set prefetch (QoS): process one message at a time
await this.channel.prefetch(this.config.prefetchCount);
// Declare exchange and queue
await this.channel.assertExchange(this.config.exchangeName, 'topic', {
durable: true,
});
await this.channel.assertQueue(this.config.queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': `${this.config.exchangeName}-dlx`,
},
});
// Bind queue to exchange with routing patterns
for (const pattern of this.config.routingPatterns) {
await this.channel.bindQueue(
this.config.queueName,
this.config.exchangeName,
pattern
);
}
}
async consume(
handler: (message: OrderMessage) => Promise<void>
): Promise<void> {
if (!this.channel) {
throw new Error('Consumer not connected');
}
await this.channel.consume(
this.config.queueName,
async (msg: Message | null) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString()) as OrderMessage;
const retries = msg.properties.headers['x-death']?.length ?? 0;
if (retries > this.config.maxRetries) {
console.error(
`Message ${msg.properties.messageId} exceeded max retries. Moving to DLQ.`
);
// Acknowledge to remove from queue; DLQ will capture it
this.channel!.ack(msg);
return;
}
// Process with timeout
await Promise.race([
handler(content),
new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Processing timeout')),
30000 // 30 second timeout
)
),
]);
// Acknowledge successful processing
this.channel!.ack(msg);
console.log(
`Processed order ${content.orderId} successfully`
);
} catch (error) {
console.error(
`Error processing message ${msg.properties.messageId}:`,
error
);
// Negative acknowledgment: requeue for retry
this.channel!.nack(msg, false, true);
}
},
{ noAck: false } // Manual acknowledgment mode
);
console.log(`Consumer listening on queue: ${this.config.queueName}`);
}
async close(): Promise<void> {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// Usage
const consumer = new RabbitMQConsumer({
exchangeName: 'orders',
queueName: 'order-processing-queue',
routingPatterns: ['orders.*.created'],
prefetchCount: 1,
maxRetries: 3,
});
await consumer.connect('amqp://guest:guest@localhost');
await consumer.consume(async (order: OrderMessage) => {
// Simulate processing
console.log(`Processing order ${order.orderId} for customer ${order.customerId}`);
if (order.total < 0) {
throw new Error('Invalid order amount');
}
// Actual processing logic here
await new Promise((resolve) => setTimeout(resolve, 1000));
});// RabbitMQConsumer.java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
public class RabbitMQConsumer {
public record ProcessorConfig(
String exchangeName, String queueName,
List<String> routingPatterns, int prefetchCount, int maxRetries) {}
private Connection connection;
private Channel channel;
private final ProcessorConfig config;
private static final ObjectMapper mapper = new ObjectMapper();
public RabbitMQConsumer(ProcessorConfig config) { this.config = config; }
public void connect(String url) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(url);
this.connection = factory.newConnection();
this.channel = connection.createChannel();
channel.basicQos(config.prefetchCount());
channel.exchangeDeclare(config.exchangeName(), "topic", true);
channel.queueDeclare(config.queueName(), true, false, false,
Map.of("x-dead-letter-exchange", config.exchangeName() + "-dlx"));
for (String pattern : config.routingPatterns()) {
channel.queueBind(config.queueName(), config.exchangeName(), pattern);
}
}
public void consume(Consumer<Map<String, Object>> handler) throws IOException {
if (channel == null) throw new IllegalStateException("Consumer not connected");
channel.basicConsume(config.queueName(), false, (consumerTag, delivery) -> {
String messageId = delivery.getProperties().getMessageId();
try {
@SuppressWarnings("unchecked")
Map<String, Object> content = mapper.readValue(
new String(delivery.getBody(), StandardCharsets.UTF_8), Map.class);
// Check retries via x-death header
Map<String, Object> headers = delivery.getProperties().getHeaders();
int retries = headers != null && headers.containsKey("x-death")
? ((List<?>) headers.get("x-death")).size() : 0;
if (retries > config.maxRetries()) {
System.err.println("Message " + messageId + " exceeded max retries. Moving to DLQ.");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
return;
}
handler.accept(content);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("Processed order " + content.get("orderId") + " successfully");
} catch (Exception e) {
System.err.println("Error processing message " + messageId + ": " + e.getMessage());
try {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
} catch (IOException ex) { ex.printStackTrace(); }
}
}, consumerTag -> {});
System.out.println("Consumer listening on queue: " + config.queueName());
}
public void close() throws IOException, TimeoutException {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}# rabbitmq_consumer.py
import aio_pika
import json
import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable
@dataclass
class ProcessorConfig:
exchange_name: str
queue_name: str
routing_patterns: list[str]
prefetch_count: int
max_retries: int
class RabbitMQConsumer:
def __init__(self, config: ProcessorConfig):
self.config = config
self.connection = None
self.channel = None
async def connect(self, url: str = "amqp://localhost") -> None:
self.connection = await aio_pika.connect_robust(url)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=self.config.prefetch_count)
exchange = await self.channel.declare_exchange(
self.config.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
)
queue = await self.channel.declare_queue(
self.config.queue_name,
durable=True,
arguments={"x-dead-letter-exchange": f"{self.config.exchange_name}-dlx"},
)
for pattern in self.config.routing_patterns:
await queue.bind(exchange, routing_key=pattern)
self.queue = queue
async def consume(self, handler: Callable[[dict], Awaitable[None]]) -> None:
if not self.channel:
raise RuntimeError("Consumer not connected")
async def process_message(message: aio_pika.IncomingMessage):
message_id = message.message_id
async with message.process(requeue=True):
try:
content = json.loads(message.body.decode())
# Check retries
headers = message.headers or {}
retries = len(headers.get("x-death", [])) if "x-death" in headers else 0
if retries > self.config.max_retries:
print(f"Message {message_id} exceeded max retries. Moving to DLQ.")
return
await asyncio.wait_for(handler(content), timeout=30)
print(f"Processed order {content.get('order_id')} successfully")
except asyncio.TimeoutError:
print(f"Processing timeout for message {message_id}")
raise
except Exception as e:
print(f"Error processing message {message_id}: {e}")
raise
await self.queue.consume(process_message)
print(f"Consumer listening on queue: {self.config.queue_name}")
async def close(self) -> None:
if self.connection:
await self.connection.close()
# Usage
async def main():
consumer = RabbitMQConsumer(ProcessorConfig(
exchange_name="orders",
queue_name="order-processing-queue",
routing_patterns=["orders.*.created"],
prefetch_count=1,
max_retries=3,
))
await consumer.connect("amqp://guest:guest@localhost")
async def handle_order(order: dict):
print(f"Processing order {order['order_id']} for customer {order['customer_id']}")
if order["total"] < 0:
raise ValueError("Invalid order amount")
await asyncio.sleep(1)
await consumer.consume(handle_order)
await asyncio.Future() # Run forever
asyncio.run(main())// RabbitMQConsumer.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;
public record ProcessorConfig(
string ExchangeName, string QueueName,
string[] RoutingPatterns, int PrefetchCount, int MaxRetries);
public class RabbitMQConsumer : IDisposable
{
private IConnection? _connection;
private IModel? _channel;
private readonly ProcessorConfig _config;
public RabbitMQConsumer(ProcessorConfig config) { _config = config; }
public void Connect(string url = "amqp://localhost")
{
var factory = new ConnectionFactory { Uri = new Uri(url) };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.BasicQos(0, (ushort)_config.PrefetchCount, false);
_channel.ExchangeDeclare(_config.ExchangeName, ExchangeType.Topic, durable: true);
_channel.QueueDeclare(_config.QueueName, durable: true, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-dead-letter-exchange"] = $"{_config.ExchangeName}-dlx"
});
foreach (var pattern in _config.RoutingPatterns)
_channel.QueueBind(_config.QueueName, _config.ExchangeName, pattern);
}
public void Consume(Func<Dictionary<string, object>, Task> handler)
{
if (_channel == null) throw new InvalidOperationException("Consumer not connected");
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (_, ea) =>
{
var messageId = ea.BasicProperties.MessageId;
try
{
var content = JsonSerializer.Deserialize<Dictionary<string, object>>(
Encoding.UTF8.GetString(ea.Body.ToArray()))!;
var headers = ea.BasicProperties.Headers;
int retries = headers != null && headers.TryGetValue("x-death", out var xDeath)
? (xDeath as List<object>)?.Count ?? 0 : 0;
if (retries > _config.MaxRetries)
{
Console.Error.WriteLine($"Message {messageId} exceeded max retries. Moving to DLQ.");
_channel.BasicAck(ea.DeliveryTag, false);
return;
}
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await handler(content).WaitAsync(cts.Token);
_channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Processed order {content.GetValueOrDefault("orderId")} successfully");
}
catch (Exception e)
{
Console.Error.WriteLine($"Error processing message {messageId}: {e.Message}");
_channel.BasicNack(ea.DeliveryTag, false, requeue: true);
}
};
_channel.BasicConsume(_config.QueueName, autoAck: false, consumer: consumer);
Console.WriteLine($"Consumer listening on queue: {_config.QueueName}");
}
public void Dispose()
{
_channel?.Close();
_connection?.Close();
}
}Amazon SQS: Managed Queue Service
AWS SQS ให้บริการสองประเภท: Standard และ FIFO
Standard vs FIFO Queues
Standard Queues:
- ปริมาณการส่งไม่จำกัด
- จัดส่งอย่างน้อยหนึ่งครั้ง
- FIFO ที่ดีที่สุด (ไม่รับประกัน)
- เวลาตอบสนองต่ำกว่า
- ใช้สำหรับ: การแจ้งเตือนเหตุการณ์ งานที่เรียงลำดับหลวม
FIFO Queues:
- การจัดส่งตามลำดับ (FIFO อย่างเข้มงวดต่อกลุ่มข้อความ)
- การประมวลผลแม่นยำหนึ่งครั้ง
- จำกัดเป็น 300 API calls/second
- เวลาตอบสนองสูงกว่าเนื่องจากการรับประกันการสั่งซื้อ
- ใช้สำหรับ: เวิร์กโฟลว์ที่สำคัญ การประมวลผลแบบลำดับ
Visibility Timeout and Acknowledgment
เมื่อ consumer รับข้อความจาก SQS จะกลายเป็นสิ่งที่มองไม่เห็นจาก consumers อื่นเป็นระยะเวลา visibility timeout (ค่าเริ่มต้น 30 วินาที) หากผู้บริโภคไม่ลบข้อความก่อนหมดเวลา SQS จะทำให้มองเห็นได้อีกครั้ง (requeue)
import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
interface OrderPayload {
orderId: string;
amount: number;
}
class SQSMessageQueue {
private client: SQSClient;
private queueUrl: string;
constructor(region: string, queueUrl: string) {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async sendMessage(payload: OrderPayload, delaySeconds: number = 0): Promise<string> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(payload),
DelaySeconds: delaySeconds,
MessageAttributes: {
'OrderId': {
StringValue: payload.orderId,
DataType: 'String',
},
'Timestamp': {
StringValue: new Date().toISOString(),
DataType: 'String',
},
},
});
const response = await this.client.send(command);
return response.MessageId!;
}
async receiveMessages(maxMessages: number = 10, visibilityTimeout: number = 30) {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: maxMessages,
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: 20, // Long polling
MessageAttributeNames: ['All'],
});
const response = await this.client.send(command);
return response.Messages || [];
}
async deleteMessage(receiptHandle: string): Promise<void> {
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.client.send(command);
}
}
// Consumer implementation
class SQSConsumer {
private queue: SQSMessageQueue;
private isRunning: boolean = false;
constructor(queue: SQSMessageQueue) {
this.queue = queue;
}
async start(handler: (payload: OrderPayload) => Promise<void>): Promise<void> {
this.isRunning = true;
while (this.isRunning) {
try {
const messages = await this.queue.receiveMessages(10, 60);
if (messages.length === 0) {
continue;
}
for (const message of messages) {
try {
const payload = JSON.parse(message.Body!) as OrderPayload;
// Process with timeout
await Promise.race([
handler(payload),
new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Processing timeout')),
50000 // 50 second timeout (less than 60 second visibility)
)
),
]);
// Delete message on success
await this.queue.deleteMessage(message.ReceiptHandle!);
console.log(`Processed order ${payload.orderId}`);
} catch (error) {
console.error(
`Error processing message ${message.MessageId}:`,
error
);
// Don't delete; message will reappear after visibility timeout
}
}
} catch (error) {
console.error('Error in consumer loop:', error);
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}
stop(): void {
this.isRunning = false;
}
}// SQSMessageQueue.java
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import software.amazon.awssdk.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
public class SQSMessageQueue {
private final SqsClient client;
private final String queueUrl;
private static final ObjectMapper mapper = new ObjectMapper();
public record OrderPayload(String orderId, double amount) {}
public SQSMessageQueue(String region, String queueUrl) {
this.client = SqsClient.builder().region(Region.of(region)).build();
this.queueUrl = queueUrl;
}
public String sendMessage(OrderPayload payload, int delaySeconds) throws Exception {
SendMessageResponse response = client.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(mapper.writeValueAsString(payload))
.delaySeconds(delaySeconds)
.messageAttributes(Map.of(
"OrderId", MessageAttributeValue.builder().stringValue(payload.orderId()).dataType("String").build(),
"Timestamp", MessageAttributeValue.builder().stringValue(java.time.Instant.now().toString()).dataType("String").build()
))
.build());
return response.messageId();
}
public List<Message> receiveMessages(int maxMessages, int visibilityTimeout) {
ReceiveMessageResponse response = client.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(maxMessages)
.visibilityTimeout(visibilityTimeout)
.waitTimeSeconds(20) // Long polling
.messageAttributeNames("All")
.build());
return response.messages();
}
public void deleteMessage(String receiptHandle) {
client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.build());
}
}
// SQSConsumer.java
class SQSConsumer {
private final SQSMessageQueue queue;
private volatile boolean isRunning = false;
public SQSConsumer(SQSMessageQueue queue) { this.queue = queue; }
public void start(java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler) {
isRunning = true;
while (isRunning) {
try {
List<Message> messages = queue.receiveMessages(10, 60);
for (Message message : messages) {
try {
SQSMessageQueue.OrderPayload payload = new ObjectMapper()
.readValue(message.body(), SQSMessageQueue.OrderPayload.class);
handler.accept(payload);
queue.deleteMessage(message.receiptHandle());
System.out.println("Processed order " + payload.orderId());
} catch (Exception e) {
System.err.println("Error processing message " + message.messageId() + ": " + e.getMessage());
// Don't delete; message will reappear after visibility timeout
}
}
} catch (Exception e) {
System.err.println("Error in consumer loop: " + e.getMessage());
try { Thread.sleep(5000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
}
}
}
public void stop() { isRunning = false; }
}# sqs_message_queue.py
import boto3
import json
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Awaitable
@dataclass
class OrderPayload:
order_id: str
amount: float
class SQSMessageQueue:
def __init__(self, region: str, queue_url: str):
self.client = boto3.client("sqs", region_name=region)
self.queue_url = queue_url
def send_message(self, payload: OrderPayload, delay_seconds: int = 0) -> str:
response = self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps({"order_id": payload.order_id, "amount": payload.amount}),
DelaySeconds=delay_seconds,
MessageAttributes={
"OrderId": {"StringValue": payload.order_id, "DataType": "String"},
"Timestamp": {"StringValue": datetime.now(timezone.utc).isoformat(), "DataType": "String"},
},
)
return response["MessageId"]
def receive_messages(self, max_messages: int = 10, visibility_timeout: int = 30) -> list:
response = self.client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=max_messages,
VisibilityTimeout=visibility_timeout,
WaitTimeSeconds=20, # Long polling
MessageAttributeNames=["All"],
)
return response.get("Messages", [])
def delete_message(self, receipt_handle: str) -> None:
self.client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle)
class SQSConsumer:
def __init__(self, queue: SQSMessageQueue):
self.queue = queue
self.is_running = False
async def start(self, handler: Callable[[OrderPayload], Awaitable[None]]) -> None:
self.is_running = True
while self.is_running:
try:
messages = self.queue.receive_messages(10, 60)
for message in messages:
try:
data = json.loads(message["Body"])
payload = OrderPayload(order_id=data["order_id"], amount=data["amount"])
await asyncio.wait_for(handler(payload), timeout=50)
self.queue.delete_message(message["ReceiptHandle"])
print(f"Processed order {payload.order_id}")
except Exception as e:
print(f"Error processing message {message['MessageId']}: {e}")
# Don't delete; message will reappear after visibility timeout
except Exception as e:
print(f"Error in consumer loop: {e}")
await asyncio.sleep(5)
def stop(self) -> None:
self.is_running = False// SQSMessageQueue.cs
using Amazon.SQS;
using Amazon.SQS.Model;
using System.Text.Json;
public record OrderPayload(string OrderId, double Amount);
public class SQSMessageQueue
{
private readonly AmazonSQSClient _client;
private readonly string _queueUrl;
public SQSMessageQueue(string region, string queueUrl)
{
_client = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(region));
_queueUrl = queueUrl;
}
public async Task<string> SendMessageAsync(OrderPayload payload, int delaySeconds = 0)
{
var response = await _client.SendMessageAsync(new SendMessageRequest
{
QueueUrl = _queueUrl,
MessageBody = JsonSerializer.Serialize(payload),
DelaySeconds = delaySeconds,
MessageAttributes = new Dictionary<string, MessageAttributeValue>
{
["OrderId"] = new() { StringValue = payload.OrderId, DataType = "String" },
["Timestamp"] = new() { StringValue = DateTime.UtcNow.ToString("O"), DataType = "String" },
}
});
return response.MessageId;
}
public async Task<List<Message>> ReceiveMessagesAsync(int maxMessages = 10, int visibilityTimeout = 30)
{
var response = await _client.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = _queueUrl,
MaxNumberOfMessages = maxMessages,
VisibilityTimeout = visibilityTimeout,
WaitTimeSeconds = 20,
MessageAttributeNames = new List<string> { "All" },
});
return response.Messages;
}
public async Task DeleteMessageAsync(string receiptHandle) =>
await _client.DeleteMessageAsync(_queueUrl, receiptHandle);
}
public class SQSConsumer
{
private readonly SQSMessageQueue _queue;
private bool _isRunning;
public SQSConsumer(SQSMessageQueue queue) { _queue = queue; }
public async Task StartAsync(Func<OrderPayload, Task> handler)
{
_isRunning = true;
while (_isRunning)
{
try
{
var messages = await _queue.ReceiveMessagesAsync(10, 60);
foreach (var message in messages)
{
try
{
var payload = JsonSerializer.Deserialize<OrderPayload>(message.Body)!;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(50));
await handler(payload).WaitAsync(cts.Token);
await _queue.DeleteMessageAsync(message.ReceiptHandle);
Console.WriteLine($"Processed order {payload.OrderId}");
}
catch (Exception e)
{
Console.Error.WriteLine($"Error processing message {message.MessageId}: {e.Message}");
// Don't delete; message will reappear after visibility timeout
}
}
}
catch (Exception e)
{
Console.Error.WriteLine($"Error in consumer loop: {e.Message}");
await Task.Delay(5000);
}
}
}
public void Stop() => _isRunning = false;
}Message Patterns
Work Queue Pattern
Consumers หลายคนประมวลผลงานจากคิวเดียวในแบบขนาน
// Producer
const queue = new SQSMessageQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789/task-queue');
for (let i = 0; i < 100; i++) {
await queue.sendMessage({
orderId: `TASK-${i}`,
amount: Math.random() * 1000,
});
}
// Multiple consumers
const consumer = new SQSConsumer(queue);
const handler = async (payload: OrderPayload) => {
console.log(`Processing ${payload.orderId}`);
await new Promise((resolve) => setTimeout(resolve, Math.random() * 5000));
};
// Start 5 parallel consumers
Promise.all([...Array(5)].map(() => consumer.start(handler)));// Work Queue Pattern - Producer
SQSMessageQueue queue = new SQSMessageQueue("us-east-1",
"https://sqs.us-east-1.amazonaws.com/123456789/task-queue");
for (int i = 0; i < 100; i++) {
queue.sendMessage(new SQSMessageQueue.OrderPayload("TASK-" + i, Math.random() * 1000), 0);
}
// Work Queue Pattern - Multiple consumers via thread pool
SQSConsumer consumer = new SQSConsumer(queue);
java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler = payload -> {
System.out.println("Processing " + payload.orderId());
try { Thread.sleep((long)(Math.random() * 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
};
// Start 5 parallel consumers
java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> consumer.start(handler));
}# Work Queue Pattern - Producer
import asyncio
import random
queue = SQSMessageQueue("us-east-1",
"https://sqs.us-east-1.amazonaws.com/123456789/task-queue")
for i in range(100):
queue.send_message(OrderPayload(order_id=f"TASK-{i}", amount=random.random() * 1000))
# Multiple consumers
consumer = SQSConsumer(queue)
async def handler(payload: OrderPayload):
print(f"Processing {payload.order_id}")
await asyncio.sleep(random.random() * 5)
# Start 5 parallel consumers
async def main():
await asyncio.gather(*[consumer.start(handler) for _ in range(5)])
asyncio.run(main())// Work Queue Pattern - Producer
var queue = new SQSMessageQueue("us-east-1",
"https://sqs.us-east-1.amazonaws.com/123456789/task-queue");
for (int i = 0; i < 100; i++)
await queue.SendMessageAsync(new OrderPayload($"TASK-{i}", Random.Shared.NextDouble() * 1000));
// Multiple consumers
var consumer = new SQSConsumer(queue);
Func<OrderPayload, Task> handler = async payload =>
{
Console.WriteLine($"Processing {payload.OrderId}");
await Task.Delay((int)(Random.Shared.NextDouble() * 5000));
};
// Start 5 parallel consumers
await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => consumer.StartAsync(handler)));Pub/Sub Fanout Pattern
Exchange หนึ่งกำหนดเส้นทางข้อความไปยังคิวหลายคิว แต่ละคิวถูกบริโภคโดยบริการที่แตกต่างกัน
// In RabbitMQ, create fanout exchange
async function setupFanout(channel: amqp.Channel) {
const exchangeName = 'user-events-fanout';
// Fanout: broadcast to all bound queues
await channel.assertExchange(exchangeName, 'fanout', { durable: true });
// Email service queue
await channel.assertQueue('email-queue', { durable: true });
await channel.bindQueue('email-queue', exchangeName, '');
// Analytics queue
await channel.assertQueue('analytics-queue', { durable: true });
await channel.bindQueue('analytics-queue', exchangeName, '');
// Notification queue
await channel.assertQueue('notification-queue', { durable: true });
await channel.bindQueue('notification-queue', exchangeName, '');
return exchangeName;
}
// Single user registration message triggers three consumers
const exchangeName = await setupFanout(channel);
await channel.publish(
exchangeName,
'', // Fanout ignores routing key
Buffer.from(JSON.stringify({ userId: 'user-123', email: 'user@example.com' })),
{ persistent: true }
);// Pub/Sub Fanout Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;
public String setupFanout(Channel channel) throws Exception {
String exchangeName = "user-events-fanout";
// Fanout: broadcast to all bound queues
channel.exchangeDeclare(exchangeName, "fanout", true);
channel.queueDeclare("email-queue", true, false, false, null);
channel.queueBind("email-queue", exchangeName, "");
channel.queueDeclare("analytics-queue", true, false, false, null);
channel.queueBind("analytics-queue", exchangeName, "");
channel.queueDeclare("notification-queue", true, false, false, null);
channel.queueBind("notification-queue", exchangeName, "");
return exchangeName;
}
// Publish fanout message
String exchangeName = setupFanout(channel);
byte[] body = new ObjectMapper().writeValueAsBytes(
java.util.Map.of("userId", "user-123", "email", "user@example.com")
);
com.rabbitmq.client.AMQP.BasicProperties props =
new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish(exchangeName, "", props, body); // Fanout ignores routing key# Pub/Sub Fanout Pattern
import aio_pika
import json
async def setup_fanout(channel: aio_pika.Channel) -> aio_pika.Exchange:
exchange_name = "user-events-fanout"
# Fanout: broadcast to all bound queues
exchange = await channel.declare_exchange(
exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
)
for queue_name in ("email-queue", "analytics-queue", "notification-queue"):
queue = await channel.declare_queue(queue_name, durable=True)
await queue.bind(exchange, routing_key="") # Fanout ignores routing key
return exchange
# Publish fanout message
exchange = await setup_fanout(channel)
await exchange.publish(
aio_pika.Message(
body=json.dumps({"user_id": "user-123", "email": "user@example.com"}).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key="", # Fanout ignores routing key
)// Pub/Sub Fanout Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
static string SetupFanout(IModel channel)
{
const string exchangeName = "user-events-fanout";
// Fanout: broadcast to all bound queues
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, durable: true);
foreach (var queueName in new[] { "email-queue", "analytics-queue", "notification-queue" })
{
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueName, exchangeName, routingKey: ""); // Fanout ignores routing key
}
return exchangeName;
}
// Publish fanout message
var exchangeName = SetupFanout(channel);
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { userId = "user-123", email = "user@example.com" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchangeName, routingKey: "", props, body); // Fanout ignores routing keyTopic-Based Routing Pattern
กำหนดเส้นทางข้อความโดยยึดตามการกำหนดเส้นทางแบบลำดับชั้น (เช่น orders.processing.urgent, orders.completed.standard)
async function setupTopicRouting(channel: amqp.Channel) {
const exchangeName = 'orders-topic';
await channel.assertExchange(exchangeName, 'topic', { durable: true });
// Queue for all order updates
await channel.assertQueue('all-orders-queue', { durable: true });
await channel.bindQueue('all-orders-queue', exchangeName, 'orders.#'); // All events
// Queue for urgent orders only
await channel.assertQueue('urgent-orders-queue', { durable: true });
await channel.bindQueue('urgent-orders-queue', exchangeName, 'orders.*.urgent');
// Queue for processing orders
await channel.assertQueue('processing-queue', { durable: true });
await channel.bindQueue('processing-queue', exchangeName, 'orders.processing.*');
return exchangeName;
}
// Publish with topic routing
await channel.publish(
'orders-topic',
'orders.processing.urgent',
Buffer.from(JSON.stringify({ orderId: '12345', priority: 'urgent' })),
{ persistent: true }
);
// 'urgent-orders-queue' and 'processing-queue' both receive this message
// 'all-orders-queue' also receives it// Topic-Based Routing Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;
public String setupTopicRouting(Channel channel) throws Exception {
String exchangeName = "orders-topic";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare("all-orders-queue", true, false, false, null);
channel.queueBind("all-orders-queue", exchangeName, "orders.#"); // All events
channel.queueDeclare("urgent-orders-queue", true, false, false, null);
channel.queueBind("urgent-orders-queue", exchangeName, "orders.*.urgent");
channel.queueDeclare("processing-queue", true, false, false, null);
channel.queueBind("processing-queue", exchangeName, "orders.processing.*");
return exchangeName;
}
// Publish with topic routing
byte[] body = new ObjectMapper().writeValueAsBytes(
java.util.Map.of("orderId", "12345", "priority", "urgent")
);
com.rabbitmq.client.AMQP.BasicProperties props =
new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives it# Topic-Based Routing Pattern
import aio_pika
import json
async def setup_topic_routing(channel: aio_pika.Channel) -> aio_pika.Exchange:
exchange_name = "orders-topic"
exchange = await channel.declare_exchange(
exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
)
bindings = [
("all-orders-queue", "orders.#"), # All events
("urgent-orders-queue", "orders.*.urgent"),
("processing-queue", "orders.processing.*"),
]
for queue_name, routing_key in bindings:
queue = await channel.declare_queue(queue_name, durable=True)
await queue.bind(exchange, routing_key=routing_key)
return exchange
# Publish with topic routing
exchange = await setup_topic_routing(channel)
await exchange.publish(
aio_pika.Message(
body=json.dumps({"order_id": "12345", "priority": "urgent"}).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key="orders.processing.urgent",
)
# 'urgent-orders-queue' and 'processing-queue' both receive this
# 'all-orders-queue' also receives it// Topic-Based Routing Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
static string SetupTopicRouting(IModel channel)
{
const string exchangeName = "orders-topic";
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, durable: true);
var bindings = new[]
{
("all-orders-queue", "orders.#"), // All events
("urgent-orders-queue", "orders.*.urgent"),
("processing-queue", "orders.processing.*"),
};
foreach (var (queueName, routingKey) in bindings)
{
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueName, exchangeName, routingKey);
}
return exchangeName;
}
// Publish with topic routing
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { orderId = "12345", priority = "urgent" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives itDead-Letter Queues and Retry Strategies
Dead-letter queues (DLQs) จับข้อความที่ล้มเหลวในการประมวลผลหลังจากการลองใหม่สูงสุด
interface RetryConfig {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
}
class MessageProcessor {
private config: RetryConfig;
constructor(config: RetryConfig) {
this.config = config;
}
async setupRabbitMQDLQ(channel: amqp.Channel): Promise<void> {
const mainExchange = 'orders';
const dlxExchange = `${mainExchange}-dlx`;
const dlqName = 'orders-dlq';
// Dead-letter exchange
await channel.assertExchange(dlxExchange, 'direct', { durable: true });
// Dead-letter queue
await channel.assertQueue(dlqName, { durable: true });
await channel.bindQueue(dlqName, dlxExchange, 'orders');
// Main queue with DLX configured
await channel.assertQueue('orders-queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': dlxExchange,
'x-dead-letter-routing-key': 'orders',
'x-max-length': 1000000, // Prevent unbounded growth
},
});
}
calculateBackoffDelay(retryCount: number): number {
const exponentialDelay = this.config.initialDelayMs *
Math.pow(this.config.backoffMultiplier, retryCount);
return Math.min(exponentialDelay, this.config.maxDelayMs);
}
async processWithRetry<T>(
handler: (data: T) => Promise<void>,
data: T,
retryCount: number = 0
): Promise<void> {
try {
await handler(data);
} catch (error) {
if (retryCount < this.config.maxRetries) {
const delayMs = this.calculateBackoffDelay(retryCount);
console.log(
`Retry ${retryCount + 1}/${this.config.maxRetries} after ${delayMs}ms`
);
await new Promise((resolve) => setTimeout(resolve, delayMs));
await this.processWithRetry(handler, data, retryCount + 1);
} else {
console.error(
`Failed after ${this.config.maxRetries} retries. Moving to DLQ.`
);
throw error;
}
}
}
}
// Usage
const processor = new MessageProcessor({
maxRetries: 3,
initialDelayMs: 1000,
maxDelayMs: 30000,
backoffMultiplier: 2,
});
await processor.processWithRetry(
async (order: OrderMessage) => {
// Process logic
if (Math.random() < 0.3) throw new Error('Temporary failure');
console.log('Order processed:', order.orderId);
},
{ orderId: 'ORD-123', customerId: 'CUST-456', total: 99.99, timestamp: new Date() }
);// MessageProcessor.java
import com.rabbitmq.client.Channel;
import java.util.Map;
import java.util.function.Consumer;
public class MessageProcessor {
public record RetryConfig(int maxRetries, long initialDelayMs, long maxDelayMs, double backoffMultiplier) {}
private final RetryConfig config;
public MessageProcessor(RetryConfig config) { this.config = config; }
public void setupRabbitMQDLQ(Channel channel) throws Exception {
String mainExchange = "orders";
String dlxExchange = mainExchange + "-dlx";
String dlqName = "orders-dlq";
channel.exchangeDeclare(dlxExchange, "direct", true);
channel.queueDeclare(dlqName, true, false, false, null);
channel.queueBind(dlqName, dlxExchange, "orders");
channel.queueDeclare("orders-queue", true, false, false, Map.of(
"x-dead-letter-exchange", dlxExchange,
"x-dead-letter-routing-key", "orders",
"x-max-length", 1_000_000
));
}
public long calculateBackoffDelay(int retryCount) {
long exponentialDelay = (long)(config.initialDelayMs() *
Math.pow(config.backoffMultiplier(), retryCount));
return Math.min(exponentialDelay, config.maxDelayMs());
}
public <T> void processWithRetry(Consumer<T> handler, T data, int retryCount) throws Exception {
try {
handler.accept(data);
} catch (Exception e) {
if (retryCount < config.maxRetries()) {
long delayMs = calculateBackoffDelay(retryCount);
System.out.printf("Retry %d/%d after %dms%n", retryCount + 1, config.maxRetries(), delayMs);
Thread.sleep(delayMs);
processWithRetry(handler, data, retryCount + 1);
} else {
System.err.printf("Failed after %d retries. Moving to DLQ.%n", config.maxRetries());
throw e;
}
}
}
// Usage
public static void main(String[] args) throws Exception {
MessageProcessor processor = new MessageProcessor(
new RetryConfig(3, 1000, 30000, 2.0)
);
processor.processWithRetry(order -> {
if (Math.random() < 0.3) throw new RuntimeException("Temporary failure");
System.out.println("Order processed: " + order);
}, "ORD-123", 0);
}
}# message_processor.py
import asyncio
import random
from dataclasses import dataclass
from typing import Callable, TypeVar, Awaitable
import aio_pika
T = TypeVar("T")
@dataclass
class RetryConfig:
max_retries: int
initial_delay_ms: int
max_delay_ms: int
backoff_multiplier: float
class MessageProcessor:
def __init__(self, config: RetryConfig):
self.config = config
async def setup_rabbitmq_dlq(self, channel: aio_pika.Channel) -> None:
main_exchange = "orders"
dlx_exchange = f"{main_exchange}-dlx"
dlq_name = "orders-dlq"
dead_exchange = await channel.declare_exchange(dlx_exchange, aio_pika.ExchangeType.DIRECT, durable=True)
dlq = await channel.declare_queue(dlq_name, durable=True)
await dlq.bind(dead_exchange, routing_key="orders")
await channel.declare_queue("orders-queue", durable=True, arguments={
"x-dead-letter-exchange": dlx_exchange,
"x-dead-letter-routing-key": "orders",
"x-max-length": 1_000_000,
})
def calculate_backoff_delay(self, retry_count: int) -> float:
delay = self.config.initial_delay_ms * (self.config.backoff_multiplier ** retry_count)
return min(delay, self.config.max_delay_ms) / 1000 # return seconds
async def process_with_retry(
self,
handler: Callable[[T], Awaitable[None]],
data: T,
retry_count: int = 0,
) -> None:
try:
await handler(data)
except Exception as e:
if retry_count < self.config.max_retries:
delay = self.calculate_backoff_delay(retry_count)
print(f"Retry {retry_count + 1}/{self.config.max_retries} after {delay*1000:.0f}ms")
await asyncio.sleep(delay)
await self.process_with_retry(handler, data, retry_count + 1)
else:
print(f"Failed after {self.config.max_retries} retries. Moving to DLQ.")
raise
# Usage
async def main():
processor = MessageProcessor(RetryConfig(
max_retries=3, initial_delay_ms=1000, max_delay_ms=30000, backoff_multiplier=2.0
))
async def handle_order(order: dict):
if random.random() < 0.3:
raise ValueError("Temporary failure")
print(f"Order processed: {order['order_id']}")
await processor.process_with_retry(
handle_order,
{"order_id": "ORD-123", "customer_id": "CUST-456", "total": 99.99}
)
asyncio.run(main())// MessageProcessor.cs
using RabbitMQ.Client;
public record RetryConfig(int MaxRetries, int InitialDelayMs, int MaxDelayMs, double BackoffMultiplier);
public class MessageProcessor
{
private readonly RetryConfig _config;
public MessageProcessor(RetryConfig config) { _config = config; }
public void SetupRabbitMQDLQ(IModel channel)
{
const string mainExchange = "orders";
var dlxExchange = $"{mainExchange}-dlx";
const string dlqName = "orders-dlq";
channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, durable: true);
channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(dlqName, dlxExchange, "orders");
channel.QueueDeclare("orders-queue", durable: true, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-dead-letter-exchange"] = dlxExchange,
["x-dead-letter-routing-key"] = "orders",
["x-max-length"] = 1_000_000,
});
}
public int CalculateBackoffDelay(int retryCount)
{
var exponentialDelay = (int)(_config.InitialDelayMs * Math.Pow(_config.BackoffMultiplier, retryCount));
return Math.Min(exponentialDelay, _config.MaxDelayMs);
}
public async Task ProcessWithRetryAsync<T>(Func<T, Task> handler, T data, int retryCount = 0)
{
try
{
await handler(data);
}
catch (Exception) when (retryCount < _config.MaxRetries)
{
int delayMs = CalculateBackoffDelay(retryCount);
Console.WriteLine($"Retry {retryCount + 1}/{_config.MaxRetries} after {delayMs}ms");
await Task.Delay(delayMs);
await ProcessWithRetryAsync(handler, data, retryCount + 1);
}
catch
{
Console.Error.WriteLine($"Failed after {_config.MaxRetries} retries. Moving to DLQ.");
throw;
}
}
}
// Usage
var processor = new MessageProcessor(new RetryConfig(3, 1000, 30000, 2.0));
await processor.ProcessWithRetryAsync(async (order) =>
{
if (Random.Shared.NextDouble() < 0.3) throw new Exception("Temporary failure");
Console.WriteLine($"Order processed: {order}");
}, new { orderId = "ORD-123", customerId = "CUST-456", total = 99.99 });Idempotency: Handling Duplicate Messages
Idempotent consumers สามารถประมวลผลข้อความเดียวกันหลายครั้งได้อย่างปลอดภัยโดยไม่มีผลข้างเคียง
interface IdempotencyStore {
isProcessed(messageId: string): Promise<boolean>;
markProcessed(messageId: string, expiryMs: number): Promise<void>;
}
class RedisIdempotencyStore implements IdempotencyStore {
constructor(private redisClient: any) {}
async isProcessed(messageId: string): Promise<boolean> {
const key = `processed:${messageId}`;
const result = await this.redisClient.get(key);
return result !== null;
}
async markProcessed(messageId: string, expiryMs: number = 86400000): Promise<void> {
const key = `processed:${messageId}`;
await this.redisClient.setex(key, Math.ceil(expiryMs / 1000), '1');
}
}
class IdempotentConsumer {
constructor(
private queue: RabbitMQConsumer,
private idempotencyStore: IdempotencyStore
) {}
async consume(
handler: (message: OrderMessage) => Promise<void>
): Promise<void> {
await this.queue.consume(async (message: OrderMessage) => {
// Check idempotency
const alreadyProcessed = await this.idempotencyStore.isProcessed(
message.orderId
);
if (alreadyProcessed) {
console.log(`Message ${message.orderId} already processed. Skipping.`);
return; // Silently skip
}
// Process
await handler(message);
// Mark as processed
await this.idempotencyStore.markProcessed(message.orderId);
});
}
}// IdempotencyStore.java
import redis.clients.jedis.Jedis;
public interface IdempotencyStore {
boolean isProcessed(String messageId);
void markProcessed(String messageId, long expiryMs);
}
public class RedisIdempotencyStore implements IdempotencyStore {
private final Jedis redis;
public RedisIdempotencyStore(Jedis redis) { this.redis = redis; }
@Override
public boolean isProcessed(String messageId) {
return redis.get("processed:" + messageId) != null;
}
@Override
public void markProcessed(String messageId, long expiryMs) {
redis.setex("processed:" + messageId, (int) Math.ceil(expiryMs / 1000.0), "1");
}
}
public class IdempotentConsumer {
private final RabbitMQConsumer queue;
private final IdempotencyStore idempotencyStore;
public IdempotentConsumer(RabbitMQConsumer queue, IdempotencyStore store) {
this.queue = queue;
this.idempotencyStore = store;
}
public void consume(java.util.function.Consumer<java.util.Map<String, Object>> handler) throws Exception {
queue.consume(message -> {
String orderId = (String) message.get("orderId");
if (idempotencyStore.isProcessed(orderId)) {
System.out.println("Message " + orderId + " already processed. Skipping.");
return;
}
handler.accept(message);
idempotencyStore.markProcessed(orderId, 86_400_000L);
});
}
}# idempotency.py
import redis.asyncio as aioredis
from typing import Protocol, runtime_checkable
@runtime_checkable
class IdempotencyStore(Protocol):
async def is_processed(self, message_id: str) -> bool: ...
async def mark_processed(self, message_id: str, expiry_ms: int) -> None: ...
class RedisIdempotencyStore:
def __init__(self, redis_client: aioredis.Redis):
self.redis = redis_client
async def is_processed(self, message_id: str) -> bool:
key = f"processed:{message_id}"
return await self.redis.get(key) is not None
async def mark_processed(self, message_id: str, expiry_ms: int = 86_400_000) -> None:
key = f"processed:{message_id}"
await self.redis.setex(key, expiry_ms // 1000, "1")
class IdempotentConsumer:
def __init__(self, queue: RabbitMQConsumer, store: IdempotencyStore):
self.queue = queue
self.store = store
async def consume(self, handler):
async def idempotent_handler(message: dict):
order_id = message.get("order_id") or message.get("orderId")
if await self.store.is_processed(order_id):
print(f"Message {order_id} already processed. Skipping.")
return
await handler(message)
await self.store.mark_processed(order_id)
await self.queue.consume(idempotent_handler)// Idempotency.cs
using StackExchange.Redis;
public interface IIdempotencyStore
{
Task<bool> IsProcessedAsync(string messageId);
Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null);
}
public class RedisIdempotencyStore : IIdempotencyStore
{
private readonly IDatabase _redis;
public RedisIdempotencyStore(IConnectionMultiplexer multiplexer)
{
_redis = multiplexer.GetDatabase();
}
public async Task<bool> IsProcessedAsync(string messageId)
{
return await _redis.KeyExistsAsync($"processed:{messageId}");
}
public async Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null)
{
await _redis.StringSetAsync(
$"processed:{messageId}", "1",
expiry ?? TimeSpan.FromDays(1));
}
}
public class IdempotentConsumer
{
private readonly RabbitMQConsumer _queue;
private readonly IIdempotencyStore _store;
public IdempotentConsumer(RabbitMQConsumer queue, IIdempotencyStore store)
{
_queue = queue;
_store = store;
}
public void Consume(Func<Dictionary<string, object>, Task> handler)
{
_queue.Consume(async message =>
{
var orderId = message.GetValueOrDefault("orderId")?.ToString() ?? string.Empty;
if (await _store.IsProcessedAsync(orderId))
{
Console.WriteLine($"Message {orderId} already processed. Skipping.");
return;
}
await handler(message);
await _store.MarkProcessedAsync(orderId);
});
}
}Backpressure and Rate Control
ป้องกัน producers จากการบดขยี้ consumers
interface BackpressureConfig {
prefetchCount: number;
highWaterMark: number;
lowWaterMark: number;
}
class BackpressureController {
private isPaused: boolean = false;
async handleBackpressure(
channel: amqp.Channel,
queueName: string,
config: BackpressureConfig
): Promise<void> {
// Set prefetch (QoS) to limit in-flight messages
await channel.prefetch(config.prefetchCount);
// Monitor queue depth
const checkQueueDepth = async () => {
const queueInfo = await channel.checkQueue(queueName);
const depth = queueInfo.messageCount;
if (depth > config.highWaterMark && !this.isPaused) {
console.log(
`High backpressure: queue depth ${depth} > ${config.highWaterMark}`
);
this.isPaused = true;
// Pause processing or reduce producer throughput
} else if (depth < config.lowWaterMark && this.isPaused) {
console.log(
`Backpressure relieved: queue depth ${depth} < ${config.lowWaterMark}`
);
this.isPaused = false;
// Resume processing
}
};
setInterval(checkQueueDepth, 5000);
}
}
// Rate limiting producer
class RateLimitedProducer {
private tokens: number;
private lastRefillTime: number;
constructor(
private tokensPerSecond: number,
private maxBurst: number
) {
this.tokens = maxBurst;
this.lastRefillTime = Date.now();
}
async acquire(): Promise<void> {
const now = Date.now();
const timePassed = (now - this.lastRefillTime) / 1000;
this.tokens = Math.min(
this.maxBurst,
this.tokens + timePassed * this.tokensPerSecond
);
this.lastRefillTime = now;
if (this.tokens < 1) {
const waitTime = (1 - this.tokens) / this.tokensPerSecond * 1000;
await new Promise((resolve) => setTimeout(resolve, waitTime));
return this.acquire();
}
this.tokens -= 1;
}
async publishWithRateLimit(
publisher: (msg: any) => Promise<void>,
message: any
): Promise<void> {
await this.acquire();
await publisher(message);
}
}// BackpressureController.java
import com.rabbitmq.client.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class BackpressureController {
private final AtomicBoolean isPaused = new AtomicBoolean(false);
public void handleBackpressure(Channel channel, String queueName,
int prefetchCount, int highWaterMark, int lowWaterMark) throws Exception {
channel.basicQos(prefetchCount);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try {
long depth = channel.messageCount(queueName);
if (depth > highWaterMark && !isPaused.get()) {
System.out.printf("High backpressure: queue depth %d > %d%n", depth, highWaterMark);
isPaused.set(true);
} else if (depth < lowWaterMark && isPaused.get()) {
System.out.printf("Backpressure relieved: queue depth %d < %d%n", depth, lowWaterMark);
isPaused.set(false);
}
} catch (Exception e) { e.printStackTrace(); }
}, 0, 5, TimeUnit.SECONDS);
}
}
// Token bucket rate limiter
public class RateLimitedProducer {
private final double tokensPerSecond;
private final double maxBurst;
private double tokens;
private long lastRefillTime;
public RateLimitedProducer(double tokensPerSecond, double maxBurst) {
this.tokensPerSecond = tokensPerSecond;
this.maxBurst = maxBurst;
this.tokens = maxBurst;
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized void acquire() throws InterruptedException {
long now = System.currentTimeMillis();
double timePassed = (now - lastRefillTime) / 1000.0;
tokens = Math.min(maxBurst, tokens + timePassed * tokensPerSecond);
lastRefillTime = now;
if (tokens < 1) {
long waitTime = (long)((1 - tokens) / tokensPerSecond * 1000);
Thread.sleep(waitTime);
acquire();
return;
}
tokens -= 1;
}
public void publishWithRateLimit(java.util.function.Consumer<Object> publisher, Object message)
throws InterruptedException {
acquire();
publisher.accept(message);
}
}# backpressure.py
import asyncio
import time
import aio_pika
from dataclasses import dataclass
@dataclass
class BackpressureConfig:
prefetch_count: int
high_water_mark: int
low_water_mark: int
class BackpressureController:
def __init__(self):
self._is_paused = False
async def handle_backpressure(
self, channel: aio_pika.Channel, queue_name: str, config: BackpressureConfig
) -> None:
await channel.set_qos(prefetch_count=config.prefetch_count)
async def check_queue_depth():
while True:
await asyncio.sleep(5)
try:
queue = await channel.get_queue(queue_name, ensure=False)
depth = queue.declaration_result.message_count
if depth > config.high_water_mark and not self._is_paused:
print(f"High backpressure: queue depth {depth} > {config.high_water_mark}")
self._is_paused = True
elif depth < config.low_water_mark and self._is_paused:
print(f"Backpressure relieved: queue depth {depth} < {config.low_water_mark}")
self._is_paused = False
except Exception as e:
print(f"Error checking queue depth: {e}")
asyncio.create_task(check_queue_depth())
class RateLimitedProducer:
def __init__(self, tokens_per_second: float, max_burst: float):
self.tokens_per_second = tokens_per_second
self.max_burst = max_burst
self.tokens = max_burst
self.last_refill_time = time.monotonic()
async def acquire(self) -> None:
now = time.monotonic()
time_passed = now - self.last_refill_time
self.tokens = min(self.max_burst, self.tokens + time_passed * self.tokens_per_second)
self.last_refill_time = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.tokens_per_second
await asyncio.sleep(wait_time)
return await self.acquire()
self.tokens -= 1
async def publish_with_rate_limit(self, publisher, message) -> None:
await self.acquire()
await publisher(message)// BackpressureController.cs
using RabbitMQ.Client;
public record BackpressureConfig(int PrefetchCount, int HighWaterMark, int LowWaterMark);
public class BackpressureController
{
private bool _isPaused;
public async Task HandleBackpressureAsync(IModel channel, string queueName, BackpressureConfig config)
{
channel.BasicQos(0, (ushort)config.PrefetchCount, false);
_ = Task.Run(async () =>
{
while (true)
{
await Task.Delay(5000);
try
{
var queueInfo = channel.QueueDeclarePassive(queueName);
var depth = queueInfo.MessageCount;
if (depth > config.HighWaterMark && !_isPaused)
{
Console.WriteLine($"High backpressure: queue depth {depth} > {config.HighWaterMark}");
_isPaused = true;
}
else if (depth < config.LowWaterMark && _isPaused)
{
Console.WriteLine($"Backpressure relieved: queue depth {depth} < {config.LowWaterMark}");
_isPaused = false;
}
}
catch (Exception e) { Console.Error.WriteLine($"Error checking queue: {e.Message}"); }
}
});
}
}
public class RateLimitedProducer
{
private readonly double _tokensPerSecond;
private readonly double _maxBurst;
private double _tokens;
private DateTimeOffset _lastRefillTime;
private readonly SemaphoreSlim _lock = new(1, 1);
public RateLimitedProducer(double tokensPerSecond, double maxBurst)
{
_tokensPerSecond = tokensPerSecond;
_maxBurst = maxBurst;
_tokens = maxBurst;
_lastRefillTime = DateTimeOffset.UtcNow;
}
public async Task AcquireAsync()
{
await _lock.WaitAsync();
try
{
var now = DateTimeOffset.UtcNow;
var timePassed = (now - _lastRefillTime).TotalSeconds;
_tokens = Math.Min(_maxBurst, _tokens + timePassed * _tokensPerSecond);
_lastRefillTime = now;
if (_tokens < 1)
{
var waitMs = (int)((1 - _tokens) / _tokensPerSecond * 1000);
_lock.Release();
await Task.Delay(waitMs);
await AcquireAsync();
return;
}
_tokens -= 1;
}
finally { if (_lock.CurrentCount == 0) _lock.Release(); }
}
public async Task PublishWithRateLimitAsync(Func<object, Task> publisher, object message)
{
await AcquireAsync();
await publisher(message);
}
}Monitoring: Queue Depth, Consumer Lag, and Processing Time
interface QueueMetrics {
queueDepth: number;
consumerLag: number;
avgProcessingTime: number;
errorRate: number;
throughput: number; // messages/second
}
class QueueMonitor {
private metrics: Map<string, QueueMetrics> = new Map();
private processingTimes: number[] = [];
private errorCount: number = 0;
private successCount: number = 0;
private lastReportTime: number = Date.now();
recordProcessingTime(durationMs: number): void {
this.processingTimes.push(durationMs);
this.successCount++;
// Keep only last 1000 measurements
if (this.processingTimes.length > 1000) {
this.processingTimes.shift();
}
}
recordError(): void {
this.errorCount++;
}
async getMetrics(
channel: amqp.Channel,
queueName: string
): Promise<QueueMetrics> {
const queueInfo = await channel.checkQueue(queueName);
const now = Date.now();
const timeSinceLastReport = (now - this.lastReportTime) / 1000;
const avgProcessingTime =
this.processingTimes.length > 0
? this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length
: 0;
const throughput =
timeSinceLastReport > 0
? this.successCount / timeSinceLastReport
: 0;
const errorRate =
this.successCount + this.errorCount > 0
? this.errorCount / (this.successCount + this.errorCount)
: 0;
const metrics: QueueMetrics = {
queueDepth: queueInfo.messageCount,
consumerLag: queueInfo.messageCount * (avgProcessingTime / 1000),
avgProcessingTime,
errorRate,
throughput,
};
// Reset counters for next period
this.lastReportTime = now;
this.successCount = 0;
this.errorCount = 0;
return metrics;
}
async reportMetrics(
channel: amqp.Channel,
queueName: string
): Promise<void> {
const metrics = await this.getMetrics(channel, queueName);
console.log(`Queue: ${queueName}`);
console.log(` Depth: ${metrics.queueDepth} messages`);
console.log(` Lag: ${metrics.consumerLag.toFixed(2)} seconds`);
console.log(` Avg Processing Time: ${metrics.avgProcessingTime.toFixed(2)}ms`);
console.log(` Throughput: ${metrics.throughput.toFixed(2)} msg/s`);
console.log(` Error Rate: ${(metrics.errorRate * 100).toFixed(2)}%`);
}
}// QueueMonitor.java
import com.rabbitmq.client.Channel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLong;
public class QueueMonitor {
public record QueueMetrics(long queueDepth, double consumerLag, double avgProcessingTime,
double errorRate, double throughput) {}
private final Deque<Long> processingTimes = new ArrayDeque<>();
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong successCount = new AtomicLong(0);
private long lastReportTime = System.currentTimeMillis();
public void recordProcessingTime(long durationMs) {
processingTimes.addLast(durationMs);
successCount.incrementAndGet();
while (processingTimes.size() > 1000) processingTimes.pollFirst();
}
public void recordError() { errorCount.incrementAndGet(); }
public QueueMetrics getMetrics(Channel channel, String queueName) throws Exception {
long queueDepth = channel.messageCount(queueName);
long now = System.currentTimeMillis();
double timeSinceLastReport = (now - lastReportTime) / 1000.0;
double avgProcessingTime = processingTimes.isEmpty() ? 0
: processingTimes.stream().mapToLong(Long::longValue).average().orElse(0);
long sc = successCount.get(), ec = errorCount.get();
double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
double errorRate = (sc + ec) > 0 ? (double) ec / (sc + ec) : 0;
lastReportTime = now;
successCount.set(0);
errorCount.set(0);
return new QueueMetrics(queueDepth, queueDepth * (avgProcessingTime / 1000),
avgProcessingTime, errorRate, throughput);
}
public void reportMetrics(Channel channel, String queueName) throws Exception {
QueueMetrics m = getMetrics(channel, queueName);
System.out.printf("Queue: %s%n", queueName);
System.out.printf(" Depth: %d messages%n", m.queueDepth());
System.out.printf(" Lag: %.2f seconds%n", m.consumerLag());
System.out.printf(" Avg Processing Time: %.2fms%n", m.avgProcessingTime());
System.out.printf(" Throughput: %.2f msg/s%n", m.throughput());
System.out.printf(" Error Rate: %.2f%%%n", m.errorRate() * 100);
}
}# queue_monitor.py
import time
import asyncio
from collections import deque
from dataclasses import dataclass
import aio_pika
@dataclass
class QueueMetrics:
queue_depth: int
consumer_lag: float
avg_processing_time: float
error_rate: float
throughput: float
class QueueMonitor:
def __init__(self):
self.processing_times: deque[float] = deque(maxlen=1000)
self.error_count = 0
self.success_count = 0
self.last_report_time = time.monotonic()
def record_processing_time(self, duration_ms: float) -> None:
self.processing_times.append(duration_ms)
self.success_count += 1
def record_error(self) -> None:
self.error_count += 1
async def get_metrics(self, channel: aio_pika.Channel, queue_name: str) -> QueueMetrics:
queue = await channel.declare_queue(queue_name, passive=True)
queue_depth = queue.declaration_result.message_count
now = time.monotonic()
time_since_last = now - self.last_report_time
avg_processing_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times else 0
)
throughput = self.success_count / time_since_last if time_since_last > 0 else 0
total = self.success_count + self.error_count
error_rate = self.error_count / total if total > 0 else 0
self.last_report_time = now
self.success_count = 0
self.error_count = 0
return QueueMetrics(
queue_depth=queue_depth,
consumer_lag=queue_depth * (avg_processing_time / 1000),
avg_processing_time=avg_processing_time,
error_rate=error_rate,
throughput=throughput,
)
async def report_metrics(self, channel: aio_pika.Channel, queue_name: str) -> None:
m = await self.get_metrics(channel, queue_name)
print(f"Queue: {queue_name}")
print(f" Depth: {m.queue_depth} messages")
print(f" Lag: {m.consumer_lag:.2f} seconds")
print(f" Avg Processing Time: {m.avg_processing_time:.2f}ms")
print(f" Throughput: {m.throughput:.2f} msg/s")
print(f" Error Rate: {m.error_rate * 100:.2f}%")// QueueMonitor.cs
using RabbitMQ.Client;
using System.Collections.Concurrent;
public record QueueMetrics(
long QueueDepth, double ConsumerLag, double AvgProcessingTime,
double ErrorRate, double Throughput);
public class QueueMonitor
{
private readonly ConcurrentQueue<double> _processingTimes = new();
private long _errorCount;
private long _successCount;
private DateTimeOffset _lastReportTime = DateTimeOffset.UtcNow;
public void RecordProcessingTime(double durationMs)
{
_processingTimes.Enqueue(durationMs);
Interlocked.Increment(ref _successCount);
// Keep only last 1000
while (_processingTimes.Count > 1000)
_processingTimes.TryDequeue(out _);
}
public void RecordError() => Interlocked.Increment(ref _errorCount);
public QueueMetrics GetMetrics(IModel channel, string queueName)
{
var queueInfo = channel.QueueDeclarePassive(queueName);
var now = DateTimeOffset.UtcNow;
var timeSinceLastReport = (now - _lastReportTime).TotalSeconds;
var times = _processingTimes.ToArray();
double avgProcessingTime = times.Length > 0 ? times.Average() : 0;
long sc = Interlocked.Exchange(ref _successCount, 0);
long ec = Interlocked.Exchange(ref _errorCount, 0);
double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
double errorRate = (sc + ec) > 0 ? (double)ec / (sc + ec) : 0;
_lastReportTime = now;
return new QueueMetrics(
queueInfo.MessageCount,
queueInfo.MessageCount * (avgProcessingTime / 1000),
avgProcessingTime,
errorRate,
throughput);
}
public void ReportMetrics(IModel channel, string queueName)
{
var m = GetMetrics(channel, queueName);
Console.WriteLine($"Queue: {queueName}");
Console.WriteLine($" Depth: {m.QueueDepth} messages");
Console.WriteLine($" Lag: {m.ConsumerLag:F2} seconds");
Console.WriteLine($" Avg Processing Time: {m.AvgProcessingTime:F2}ms");
Console.WriteLine($" Throughput: {m.Throughput:F2} msg/s");
Console.WriteLine($" Error Rate: {m.ErrorRate * 100:F2}%");
}
}Message Queue Comparison
| Feature | RabbitMQ | Amazon SQS | Redis Streams |
|---|---|---|---|
| Delivery Guarantee | At-most/least/exactly-once | At-least-once | At-least-once |
| Message Ordering | Per-queue (FIFO queues) | FIFO queues only | Partial (consumer groups) |
| Routing | Exchanges + bindings | Simple queue model | Stream keys |
| Persistence | Disk + memory | Durable (AWS managed) | Memory (snapshots) |
| Throughput | High | Very high | High |
| Latency | Low | Low (API call overhead) | Very low |
| Operational Complexity | High (self-hosted) | Low (managed) | Medium |
| Dead-Letter Queues | Built-in | Built-in | Manual implementation |
| Monitoring | Built-in plugin | CloudWatch | Redis-native tools |
| Cost | Infrastructure | Pay-per-request | Infrastructure |
| Best For | Complex routing, high reliability | Scalability, AWS ecosystem | High throughput, low latency |
Production Checklist
- Connection Pooling: นำกลับมาใช้ connections; สร้างพูลของ channels/connections
- Error Handling: ใช้ exponential backoff สำหรับความล้มเหลวชั่วคราว
- Monitoring: ติดตามความลึกของคิว consumer lag อัตราข้อผิดพลาด เวลาการประมวลผล
- Graceful Shutdown: ระบายข้อความในการบิน ก่อนหยุด consumers
- Message Serialization: ใช้ schemas ที่มีเวอร์ชัน (Protocol Buffers, Avro) เพื่อความเข้ากันได้แบบ backward
- Idempotency: ออกแบบ consumers เพื่อจัดการการประมวลผลข้อความซ้ำได้อย่างปลอดภัย
- Dead-Letter Queues: กำหนดค่า DLQs สำหรับข้อความที่ล้มเหลว ติดตามและเตือนในกรณี DLQ growth
- Acknowledgment Strategy: ใช้ manual acknowledgment (ไม่ใช่ auto-ack) สำหรับการจัดส่ง at-least-once
- Timeouts: ตั้งค่า consumer processing timeouts ให้สั้นกว่า visibility timeout
- Backpressure: ใช้ prefetch limits และติดตามความลึกของคิว
- Retry Strategy: Exponential backoff with jitter; max retries ก่อน DLQ
- Security: ใช้ TLS สำหรับการขนส่ง authenticate producers/consumers
- Testing: การทดสอบอินทิเกรชันด้วย embedded message broker (TestContainers)
- Documentation: จดหมายเหตุ message schemas, routing rules, SLAs
- Disaster Recovery: ทดสอบสถานการณ์ failover จดหมายเหตุ recovery procedures
- Message TTL: ตั้งค่า message expiration เพื่อป้องกันการสะสมคิวไม่มีกำหนด
- Circuit Breaker: หยุดการบริโภคหากบริการด้านล่างปลายน้ำลงไป ล้มเหลวอย่างรวดเร็ว
- Capacity Planning: ติดตามแนวโน้มปริมาณงาน วางแผนสำหรับโหลดสูงสุด
- Alerting: เตือนในกรณี queue depth thresholds ข้อผิดพลาด rate spikes consumer lag growth
Message queues เป็นพื้นฐานของระบบแบบกระจายที่เชื่อถือได้ รูปแบบและการใช้งานที่ครอบคลุมที่นี่จะปรับขนาดจากทีมเล็กไปจนถึงการปรับใช้แบบองค์กร เริ่มต้นอย่างง่าย ติดตามอย่างใกล้ชิด และพัฒนาโครงสร้างพื้นฐาน queue ของคุณเมื่อระบบของคุณขยายออกไป
ไปยังบทความก่อนหน้า: Server-to-Server Communication Technologies