กลับไปที่บทความ
Architecture Backend Messaging Microservices

การสื่อสารของคิว (Message Queue) เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์

พลากร วรมงคล
17 เมษายน 2568 13 นาที

“คำแนะนำที่ครอบคลุมเกี่ยวกับ message queue สำหรับการสื่อสารแบบ server-to-server — ครอบคลุม RabbitMQ, Amazon SQS, การรับประกันการจัดส่ง, dead-letter queues, backpressure และรูปแบบการใช้งานจริง”

การเจาะลึก: การสื่อสารของคิว (Message Queue)

Message queues คือกระดูกสันหลังของระบบแบบกระจายสมัยใหม่ พวกเขาแยกบริการ บัฟเฟอร์งาน และรับประกันการจัดส่งข้อความที่เชื่อถือได้ในโครงสร้างพื้นฐานของคุณ คำแนะนำนี้ครอบคลุมแนวคิด การใช้งาน และรูปแบบการใช้งานจริงที่คุณต้องการเพื่อสร้างระบบการส่งข้อความที่มีความเสถียร

Message Queues คืออะไร

Message queues ให้การสื่อสารแบบ asynchronous ระหว่างบริการต่างๆ แทนที่บริการ A จะเรียกบริการ B โดยตรง (synchronous) บริการ A จะวางข้อความลงในคิว บริการ B จะบริโภคข้อความนั้นเมื่อพร้อม สิ่งนี้สร้างการแยก temporal decoupling — บริการไม่ต้องทำงานในเวลาเดียวกัน — และการแยก spatial decoupling — บริการไม่ต้องรู้เกี่ยวกับกันและกัน

The Point-to-Point Model

ในโมเดลที่ง่ายที่สุด producer จะส่งข้อความไปยังคิว และ consumers จะดึงข้อความจากคิวนั้น แต่ละข้อความจะถูกประมวลผลโดย consumer เพียงคนเดียวเท่านั้น สิ่งนี้แตกต่างจาก pub/sub ซึ่งข้อความหนึ่งไปยังผู้บอกรับหลายคน

sequenceDiagram
    participant Producer
    participant Queue
    participant Consumer

    Producer->>Queue: Send message ("process_order")
    Queue->>Queue: Store message in buffer
    Note over Queue: Message persisted to disk
    Queue-->>Producer: Acknowledgment
    Consumer->>Queue: Poll for messages
    Queue-->>Consumer: Return message
    Consumer->>Consumer: Process message
    Consumer->>Queue: Send acknowledgment
    Queue->>Queue: Delete message

Delivery Guarantees

ระบบ message queue มีสามระดับของการรับประกันการจัดส่ง:

At-Most-Once: ข้อความอาจจะหลุดหาย แต่จะไม่ถูกจัดส่งสองครั้ง ความหมาย Fire-and-forget

At-Least-Once: ข้อความจะไม่หลุดหาย แต่อาจถูกจัดส่งหลายครั้ง ต้องใช้ consumers ที่ idempotent

Exactly-Once: แต่ละข้อความจะถูกจัดส่งแม่นยำหนึ่งครั้ง การรับประกันที่แข็งแกร่งที่สุด แต่มีค่าใช้จ่ายมากที่สุด

Core Principles

  • Decoupling: Producers และ consumers ทำงานอย่างอิสระ ลดการขึ้นต่อกันที่แน่น
  • Buffering: Queues ดูดซับการเพิ่มขึ้นของปริมาณการจราจร ป้องกันความล้มเหลวแบบลำดับ เมื่อ consumers ล่าช้า
  • Persistence: ข้อความจะถูกจัดเก็บอย่างถาวร (โดยปกติบนดิสก์) เพื่อให้ได้รับความเสียหายจากความล้มเหลวของระบบ
  • Ordering: ระบบจำนวนมากรับประกันการสั่งซื้อ FIFO ภายในคิวเดียวหรือพาร์ติชั่นเดียว
  • Acknowledgment: Consumers ยืนยันการประมวลผลอย่างชัดแจ้ง ข้อความที่ไม่ได้รับการยืนยันจะถูกลองใหม่
  • Backpressure: กลไกเพื่อป้องกัน producers จากการบดขยี้ consumers ช้า

RabbitMQ: Exchange-Based Routing

RabbitMQ ใช้ exchanges เพื่อกำหนดเส้นทางข้อความไปยังคิวตามการกำหนดเส้นทางคีย์และการสัมพันธ์ สิ่งนี้แยก message producers จากโครงสร้าง queue topology

Core Components

Exchange: รับข้อความจาก producers และกำหนดเส้นทางไปยังคิวที่ผูกไว้ สามประเภท: Direct (routing key match), Topic (pattern match), Fanout (broadcast)

Queue: จัดเก็บข้อความอย่างถาวรจนกว่าจะมีการบริโภค

Binding: แมปจาก exchange ไปยังคิวหนึ่งหรือมากกว่าหนึ่งคิวพร้อมกับรูปแบบ routing key

Routing Key: สตริงที่ producers แนบมากับข้อความ exchanges ใช้เพื่อตัดสินใจว่าคิวใดรับข้อความ

RabbitMQ Producer Example

import amqp from 'amqplib';

interface OrderMessage {
  orderId: string;
  customerId: string;
  total: number;
  timestamp: Date;
}

class RabbitMQProducer {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string = 'amqp://localhost'): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Declare exchange (idempotent)
    const exchangeName = 'orders';
    await this.channel.assertExchange(exchangeName, 'topic', { durable: true });
  }

  async publishOrder(message: OrderMessage): Promise<boolean> {
    if (!this.channel) {
      throw new Error('Producer not connected');
    }

    const exchangeName = 'orders';
    const routingKey = `orders.${message.customerId}.created`;
    const messageBuffer = Buffer.from(JSON.stringify(message));

    // Publish with confirmation
    return this.channel.publish(
      exchangeName,
      routingKey,
      messageBuffer,
      {
        persistent: true,
        contentType: 'application/json',
        timestamp: Date.now(),
        // Generate unique message ID for idempotency
        messageId: `${message.orderId}-${Date.now()}`,
      }
    );
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Usage
const producer = new RabbitMQProducer();
await producer.connect('amqp://guest:guest@localhost');

await producer.publishOrder({
  orderId: 'ORD-12345',
  customerId: 'CUST-789',
  total: 149.99,
  timestamp: new Date(),
});

await producer.close();
// RabbitMQProducer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class RabbitMQProducer {
    private Connection connection;
    private Channel channel;
    private static final ObjectMapper mapper = new ObjectMapper();

    public record OrderMessage(String orderId, String customerId, double total, Date timestamp) {}

    public void connect(String url) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(url);
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();

        // Declare exchange (idempotent)
        channel.exchangeDeclare("orders", "topic", true);
    }

    public boolean publishOrder(OrderMessage message) throws IOException {
        if (channel == null) throw new IllegalStateException("Producer not connected");

        String exchangeName = "orders";
        String routingKey = "orders." + message.customerId() + ".created";
        byte[] body = mapper.writeValueAsBytes(message);

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .deliveryMode(2) // persistent
            .contentType("application/json")
            .timestamp(new Date())
            .messageId(message.orderId() + "-" + System.currentTimeMillis())
            .build();

        channel.basicPublish(exchangeName, routingKey, props, body);
        return true;
    }

    public void close() throws IOException, TimeoutException {
        if (channel != null) channel.close();
        if (connection != null) connection.close();
    }

    // Usage
    public static void main(String[] args) throws Exception {
        RabbitMQProducer producer = new RabbitMQProducer();
        producer.connect("amqp://guest:guest@localhost");

        producer.publishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, new Date()));

        producer.close();
    }
}
# rabbitmq_producer.py
import aio_pika
import json
from datetime import datetime
from dataclasses import dataclass, asdict


@dataclass
class OrderMessage:
    order_id: str
    customer_id: str
    total: float
    timestamp: str


class RabbitMQProducer:
    def __init__(self):
        self.connection = None
        self.channel = None

    async def connect(self, url: str = "amqp://localhost") -> None:
        self.connection = await aio_pika.connect_robust(url)
        self.channel = await self.connection.channel()

        # Declare exchange (idempotent)
        self.exchange = await self.channel.declare_exchange(
            "orders", aio_pika.ExchangeType.TOPIC, durable=True
        )

    async def publish_order(self, message: OrderMessage) -> None:
        if not self.channel:
            raise RuntimeError("Producer not connected")

        routing_key = f"orders.{message.customer_id}.created"
        body = json.dumps(asdict(message)).encode()

        msg = aio_pika.Message(
            body=body,
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            content_type="application/json",
            message_id=f"{message.order_id}-{int(datetime.now().timestamp() * 1000)}",
        )
        await self.exchange.publish(msg, routing_key=routing_key)

    async def close(self) -> None:
        if self.connection:
            await self.connection.close()


# Usage
import asyncio

async def main():
    producer = RabbitMQProducer()
    await producer.connect("amqp://guest:guest@localhost")

    await producer.publish_order(OrderMessage(
        order_id="ORD-12345",
        customer_id="CUST-789",
        total=149.99,
        timestamp=datetime.now().isoformat(),
    ))

    await producer.close()

asyncio.run(main())
// RabbitMQProducer.cs
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

public record OrderMessage(string OrderId, string CustomerId, double Total, DateTime Timestamp);

public class RabbitMQProducer : IDisposable
{
    private IConnection? _connection;
    private IModel? _channel;

    public void Connect(string url = "amqp://localhost")
    {
        var factory = new ConnectionFactory { Uri = new Uri(url) };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        // Declare exchange (idempotent)
        _channel.ExchangeDeclare("orders", ExchangeType.Topic, durable: true);
    }

    public bool PublishOrder(OrderMessage message)
    {
        if (_channel == null) throw new InvalidOperationException("Producer not connected");

        var routingKey = $"orders.{message.CustomerId}.created";
        var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));

        var props = _channel.CreateBasicProperties();
        props.Persistent = true;
        props.ContentType = "application/json";
        props.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
        props.MessageId = $"{message.OrderId}-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";

        _channel.BasicPublish("orders", routingKey, props, body);
        return true;
    }

    public void Dispose()
    {
        _channel?.Close();
        _connection?.Close();
    }
}

// Usage
using var producer = new RabbitMQProducer();
producer.Connect("amqp://guest:guest@localhost");
producer.PublishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, DateTime.UtcNow));

RabbitMQ Consumer with Acknowledgment

import amqp, { Message } from 'amqplib';

interface OrderMessage {
  orderId: string;
  customerId: string;
  total: number;
  timestamp: Date;
}

interface ProcessorConfig {
  exchangeName: string;
  queueName: string;
  routingPatterns: string[];
  prefetchCount: number;
  maxRetries: number;
}

class RabbitMQConsumer {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;
  private config: ProcessorConfig;

  constructor(config: ProcessorConfig) {
    this.config = config;
  }

  async connect(url: string = 'amqp://localhost'): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Set prefetch (QoS): process one message at a time
    await this.channel.prefetch(this.config.prefetchCount);

    // Declare exchange and queue
    await this.channel.assertExchange(this.config.exchangeName, 'topic', {
      durable: true,
    });

    await this.channel.assertQueue(this.config.queueName, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': `${this.config.exchangeName}-dlx`,
      },
    });

    // Bind queue to exchange with routing patterns
    for (const pattern of this.config.routingPatterns) {
      await this.channel.bindQueue(
        this.config.queueName,
        this.config.exchangeName,
        pattern
      );
    }
  }

  async consume(
    handler: (message: OrderMessage) => Promise<void>
  ): Promise<void> {
    if (!this.channel) {
      throw new Error('Consumer not connected');
    }

    await this.channel.consume(
      this.config.queueName,
      async (msg: Message | null) => {
        if (!msg) return;

        try {
          const content = JSON.parse(msg.content.toString()) as OrderMessage;
          const retries = msg.properties.headers['x-death']?.length ?? 0;

          if (retries > this.config.maxRetries) {
            console.error(
              `Message ${msg.properties.messageId} exceeded max retries. Moving to DLQ.`
            );
            // Acknowledge to remove from queue; DLQ will capture it
            this.channel!.ack(msg);
            return;
          }

          // Process with timeout
          await Promise.race([
            handler(content),
            new Promise((_, reject) =>
              setTimeout(
                () => reject(new Error('Processing timeout')),
                30000 // 30 second timeout
              )
            ),
          ]);

          // Acknowledge successful processing
          this.channel!.ack(msg);
          console.log(
            `Processed order ${content.orderId} successfully`
          );
        } catch (error) {
          console.error(
            `Error processing message ${msg.properties.messageId}:`,
            error
          );

          // Negative acknowledgment: requeue for retry
          this.channel!.nack(msg, false, true);
        }
      },
      { noAck: false } // Manual acknowledgment mode
    );

    console.log(`Consumer listening on queue: ${this.config.queueName}`);
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Usage
const consumer = new RabbitMQConsumer({
  exchangeName: 'orders',
  queueName: 'order-processing-queue',
  routingPatterns: ['orders.*.created'],
  prefetchCount: 1,
  maxRetries: 3,
});

await consumer.connect('amqp://guest:guest@localhost');

await consumer.consume(async (order: OrderMessage) => {
  // Simulate processing
  console.log(`Processing order ${order.orderId} for customer ${order.customerId}`);
  
  if (order.total < 0) {
    throw new Error('Invalid order amount');
  }

  // Actual processing logic here
  await new Promise((resolve) => setTimeout(resolve, 1000));
});
// RabbitMQConsumer.java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class RabbitMQConsumer {

    public record ProcessorConfig(
        String exchangeName, String queueName,
        List<String> routingPatterns, int prefetchCount, int maxRetries) {}

    private Connection connection;
    private Channel channel;
    private final ProcessorConfig config;
    private static final ObjectMapper mapper = new ObjectMapper();

    public RabbitMQConsumer(ProcessorConfig config) { this.config = config; }

    public void connect(String url) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(url);
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.basicQos(config.prefetchCount());

        channel.exchangeDeclare(config.exchangeName(), "topic", true);
        channel.queueDeclare(config.queueName(), true, false, false,
            Map.of("x-dead-letter-exchange", config.exchangeName() + "-dlx"));

        for (String pattern : config.routingPatterns()) {
            channel.queueBind(config.queueName(), config.exchangeName(), pattern);
        }
    }

    public void consume(Consumer<Map<String, Object>> handler) throws IOException {
        if (channel == null) throw new IllegalStateException("Consumer not connected");

        channel.basicConsume(config.queueName(), false, (consumerTag, delivery) -> {
            String messageId = delivery.getProperties().getMessageId();
            try {
                @SuppressWarnings("unchecked")
                Map<String, Object> content = mapper.readValue(
                    new String(delivery.getBody(), StandardCharsets.UTF_8), Map.class);

                // Check retries via x-death header
                Map<String, Object> headers = delivery.getProperties().getHeaders();
                int retries = headers != null && headers.containsKey("x-death")
                    ? ((List<?>) headers.get("x-death")).size() : 0;

                if (retries > config.maxRetries()) {
                    System.err.println("Message " + messageId + " exceeded max retries. Moving to DLQ.");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    return;
                }

                handler.accept(content);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println("Processed order " + content.get("orderId") + " successfully");

            } catch (Exception e) {
                System.err.println("Error processing message " + messageId + ": " + e.getMessage());
                try {
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                } catch (IOException ex) { ex.printStackTrace(); }
            }
        }, consumerTag -> {});

        System.out.println("Consumer listening on queue: " + config.queueName());
    }

    public void close() throws IOException, TimeoutException {
        if (channel != null) channel.close();
        if (connection != null) connection.close();
    }
}
# rabbitmq_consumer.py
import aio_pika
import json
import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable


@dataclass
class ProcessorConfig:
    exchange_name: str
    queue_name: str
    routing_patterns: list[str]
    prefetch_count: int
    max_retries: int


class RabbitMQConsumer:
    def __init__(self, config: ProcessorConfig):
        self.config = config
        self.connection = None
        self.channel = None

    async def connect(self, url: str = "amqp://localhost") -> None:
        self.connection = await aio_pika.connect_robust(url)
        self.channel = await self.connection.channel()

        await self.channel.set_qos(prefetch_count=self.config.prefetch_count)

        exchange = await self.channel.declare_exchange(
            self.config.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
        )

        queue = await self.channel.declare_queue(
            self.config.queue_name,
            durable=True,
            arguments={"x-dead-letter-exchange": f"{self.config.exchange_name}-dlx"},
        )

        for pattern in self.config.routing_patterns:
            await queue.bind(exchange, routing_key=pattern)

        self.queue = queue

    async def consume(self, handler: Callable[[dict], Awaitable[None]]) -> None:
        if not self.channel:
            raise RuntimeError("Consumer not connected")

        async def process_message(message: aio_pika.IncomingMessage):
            message_id = message.message_id
            async with message.process(requeue=True):
                try:
                    content = json.loads(message.body.decode())

                    # Check retries
                    headers = message.headers or {}
                    retries = len(headers.get("x-death", [])) if "x-death" in headers else 0

                    if retries > self.config.max_retries:
                        print(f"Message {message_id} exceeded max retries. Moving to DLQ.")
                        return

                    await asyncio.wait_for(handler(content), timeout=30)
                    print(f"Processed order {content.get('order_id')} successfully")

                except asyncio.TimeoutError:
                    print(f"Processing timeout for message {message_id}")
                    raise
                except Exception as e:
                    print(f"Error processing message {message_id}: {e}")
                    raise

        await self.queue.consume(process_message)
        print(f"Consumer listening on queue: {self.config.queue_name}")

    async def close(self) -> None:
        if self.connection:
            await self.connection.close()


# Usage
async def main():
    consumer = RabbitMQConsumer(ProcessorConfig(
        exchange_name="orders",
        queue_name="order-processing-queue",
        routing_patterns=["orders.*.created"],
        prefetch_count=1,
        max_retries=3,
    ))

    await consumer.connect("amqp://guest:guest@localhost")

    async def handle_order(order: dict):
        print(f"Processing order {order['order_id']} for customer {order['customer_id']}")
        if order["total"] < 0:
            raise ValueError("Invalid order amount")
        await asyncio.sleep(1)

    await consumer.consume(handle_order)
    await asyncio.Future()  # Run forever

asyncio.run(main())
// RabbitMQConsumer.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;

public record ProcessorConfig(
    string ExchangeName, string QueueName,
    string[] RoutingPatterns, int PrefetchCount, int MaxRetries);

public class RabbitMQConsumer : IDisposable
{
    private IConnection? _connection;
    private IModel? _channel;
    private readonly ProcessorConfig _config;

    public RabbitMQConsumer(ProcessorConfig config) { _config = config; }

    public void Connect(string url = "amqp://localhost")
    {
        var factory = new ConnectionFactory { Uri = new Uri(url) };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.BasicQos(0, (ushort)_config.PrefetchCount, false);

        _channel.ExchangeDeclare(_config.ExchangeName, ExchangeType.Topic, durable: true);
        _channel.QueueDeclare(_config.QueueName, durable: true, exclusive: false, autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-dead-letter-exchange"] = $"{_config.ExchangeName}-dlx"
            });

        foreach (var pattern in _config.RoutingPatterns)
            _channel.QueueBind(_config.QueueName, _config.ExchangeName, pattern);
    }

    public void Consume(Func<Dictionary<string, object>, Task> handler)
    {
        if (_channel == null) throw new InvalidOperationException("Consumer not connected");

        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.Received += async (_, ea) =>
        {
            var messageId = ea.BasicProperties.MessageId;
            try
            {
                var content = JsonSerializer.Deserialize<Dictionary<string, object>>(
                    Encoding.UTF8.GetString(ea.Body.ToArray()))!;

                var headers = ea.BasicProperties.Headers;
                int retries = headers != null && headers.TryGetValue("x-death", out var xDeath)
                    ? (xDeath as List<object>)?.Count ?? 0 : 0;

                if (retries > _config.MaxRetries)
                {
                    Console.Error.WriteLine($"Message {messageId} exceeded max retries. Moving to DLQ.");
                    _channel.BasicAck(ea.DeliveryTag, false);
                    return;
                }

                using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
                await handler(content).WaitAsync(cts.Token);

                _channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine($"Processed order {content.GetValueOrDefault("orderId")} successfully");
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"Error processing message {messageId}: {e.Message}");
                _channel.BasicNack(ea.DeliveryTag, false, requeue: true);
            }
        };

        _channel.BasicConsume(_config.QueueName, autoAck: false, consumer: consumer);
        Console.WriteLine($"Consumer listening on queue: {_config.QueueName}");
    }

    public void Dispose()
    {
        _channel?.Close();
        _connection?.Close();
    }
}

Amazon SQS: Managed Queue Service

AWS SQS ให้บริการสองประเภท: Standard และ FIFO

Standard vs FIFO Queues

Standard Queues:

  • ปริมาณการส่งไม่จำกัด
  • จัดส่งอย่างน้อยหนึ่งครั้ง
  • FIFO ที่ดีที่สุด (ไม่รับประกัน)
  • เวลาตอบสนองต่ำกว่า
  • ใช้สำหรับ: การแจ้งเตือนเหตุการณ์ งานที่เรียงลำดับหลวม

FIFO Queues:

  • การจัดส่งตามลำดับ (FIFO อย่างเข้มงวดต่อกลุ่มข้อความ)
  • การประมวลผลแม่นยำหนึ่งครั้ง
  • จำกัดเป็น 300 API calls/second
  • เวลาตอบสนองสูงกว่าเนื่องจากการรับประกันการสั่งซื้อ
  • ใช้สำหรับ: เวิร์กโฟลว์ที่สำคัญ การประมวลผลแบบลำดับ

Visibility Timeout and Acknowledgment

เมื่อ consumer รับข้อความจาก SQS จะกลายเป็นสิ่งที่มองไม่เห็นจาก consumers อื่นเป็นระยะเวลา visibility timeout (ค่าเริ่มต้น 30 วินาที) หากผู้บริโภคไม่ลบข้อความก่อนหมดเวลา SQS จะทำให้มองเห็นได้อีกครั้ง (requeue)

import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';

interface OrderPayload {
  orderId: string;
  amount: number;
}

class SQSMessageQueue {
  private client: SQSClient;
  private queueUrl: string;

  constructor(region: string, queueUrl: string) {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async sendMessage(payload: OrderPayload, delaySeconds: number = 0): Promise<string> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(payload),
      DelaySeconds: delaySeconds,
      MessageAttributes: {
        'OrderId': {
          StringValue: payload.orderId,
          DataType: 'String',
        },
        'Timestamp': {
          StringValue: new Date().toISOString(),
          DataType: 'String',
        },
      },
    });

    const response = await this.client.send(command);
    return response.MessageId!;
  }

  async receiveMessages(maxMessages: number = 10, visibilityTimeout: number = 30) {
    const command = new ReceiveMessageCommand({
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: maxMessages,
      VisibilityTimeout: visibilityTimeout,
      WaitTimeSeconds: 20, // Long polling
      MessageAttributeNames: ['All'],
    });

    const response = await this.client.send(command);
    return response.Messages || [];
  }

  async deleteMessage(receiptHandle: string): Promise<void> {
    const command = new DeleteMessageCommand({
      QueueUrl: this.queueUrl,
      ReceiptHandle: receiptHandle,
    });

    await this.client.send(command);
  }
}

// Consumer implementation
class SQSConsumer {
  private queue: SQSMessageQueue;
  private isRunning: boolean = false;

  constructor(queue: SQSMessageQueue) {
    this.queue = queue;
  }

  async start(handler: (payload: OrderPayload) => Promise<void>): Promise<void> {
    this.isRunning = true;

    while (this.isRunning) {
      try {
        const messages = await this.queue.receiveMessages(10, 60);

        if (messages.length === 0) {
          continue;
        }

        for (const message of messages) {
          try {
            const payload = JSON.parse(message.Body!) as OrderPayload;

            // Process with timeout
            await Promise.race([
              handler(payload),
              new Promise((_, reject) =>
                setTimeout(
                  () => reject(new Error('Processing timeout')),
                  50000 // 50 second timeout (less than 60 second visibility)
                )
              ),
            ]);

            // Delete message on success
            await this.queue.deleteMessage(message.ReceiptHandle!);
            console.log(`Processed order ${payload.orderId}`);
          } catch (error) {
            console.error(
              `Error processing message ${message.MessageId}:`,
              error
            );
            // Don't delete; message will reappear after visibility timeout
          }
        }
      } catch (error) {
        console.error('Error in consumer loop:', error);
        await new Promise((resolve) => setTimeout(resolve, 5000));
      }
    }
  }

  stop(): void {
    this.isRunning = false;
  }
}
// SQSMessageQueue.java
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import software.amazon.awssdk.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;
import java.util.Map;

public class SQSMessageQueue {
    private final SqsClient client;
    private final String queueUrl;
    private static final ObjectMapper mapper = new ObjectMapper();

    public record OrderPayload(String orderId, double amount) {}

    public SQSMessageQueue(String region, String queueUrl) {
        this.client = SqsClient.builder().region(Region.of(region)).build();
        this.queueUrl = queueUrl;
    }

    public String sendMessage(OrderPayload payload, int delaySeconds) throws Exception {
        SendMessageResponse response = client.sendMessage(SendMessageRequest.builder()
            .queueUrl(queueUrl)
            .messageBody(mapper.writeValueAsString(payload))
            .delaySeconds(delaySeconds)
            .messageAttributes(Map.of(
                "OrderId", MessageAttributeValue.builder().stringValue(payload.orderId()).dataType("String").build(),
                "Timestamp", MessageAttributeValue.builder().stringValue(java.time.Instant.now().toString()).dataType("String").build()
            ))
            .build());
        return response.messageId();
    }

    public List<Message> receiveMessages(int maxMessages, int visibilityTimeout) {
        ReceiveMessageResponse response = client.receiveMessage(ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(maxMessages)
            .visibilityTimeout(visibilityTimeout)
            .waitTimeSeconds(20) // Long polling
            .messageAttributeNames("All")
            .build());
        return response.messages();
    }

    public void deleteMessage(String receiptHandle) {
        client.deleteMessage(DeleteMessageRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(receiptHandle)
            .build());
    }
}

// SQSConsumer.java
class SQSConsumer {
    private final SQSMessageQueue queue;
    private volatile boolean isRunning = false;

    public SQSConsumer(SQSMessageQueue queue) { this.queue = queue; }

    public void start(java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler) {
        isRunning = true;

        while (isRunning) {
            try {
                List<Message> messages = queue.receiveMessages(10, 60);

                for (Message message : messages) {
                    try {
                        SQSMessageQueue.OrderPayload payload = new ObjectMapper()
                            .readValue(message.body(), SQSMessageQueue.OrderPayload.class);
                        handler.accept(payload);
                        queue.deleteMessage(message.receiptHandle());
                        System.out.println("Processed order " + payload.orderId());
                    } catch (Exception e) {
                        System.err.println("Error processing message " + message.messageId() + ": " + e.getMessage());
                        // Don't delete; message will reappear after visibility timeout
                    }
                }
            } catch (Exception e) {
                System.err.println("Error in consumer loop: " + e.getMessage());
                try { Thread.sleep(5000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
            }
        }
    }

    public void stop() { isRunning = false; }
}
# sqs_message_queue.py
import boto3
import json
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Awaitable


@dataclass
class OrderPayload:
    order_id: str
    amount: float


class SQSMessageQueue:
    def __init__(self, region: str, queue_url: str):
        self.client = boto3.client("sqs", region_name=region)
        self.queue_url = queue_url

    def send_message(self, payload: OrderPayload, delay_seconds: int = 0) -> str:
        response = self.client.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps({"order_id": payload.order_id, "amount": payload.amount}),
            DelaySeconds=delay_seconds,
            MessageAttributes={
                "OrderId": {"StringValue": payload.order_id, "DataType": "String"},
                "Timestamp": {"StringValue": datetime.now(timezone.utc).isoformat(), "DataType": "String"},
            },
        )
        return response["MessageId"]

    def receive_messages(self, max_messages: int = 10, visibility_timeout: int = 30) -> list:
        response = self.client.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            VisibilityTimeout=visibility_timeout,
            WaitTimeSeconds=20,  # Long polling
            MessageAttributeNames=["All"],
        )
        return response.get("Messages", [])

    def delete_message(self, receipt_handle: str) -> None:
        self.client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle)


class SQSConsumer:
    def __init__(self, queue: SQSMessageQueue):
        self.queue = queue
        self.is_running = False

    async def start(self, handler: Callable[[OrderPayload], Awaitable[None]]) -> None:
        self.is_running = True

        while self.is_running:
            try:
                messages = self.queue.receive_messages(10, 60)

                for message in messages:
                    try:
                        data = json.loads(message["Body"])
                        payload = OrderPayload(order_id=data["order_id"], amount=data["amount"])

                        await asyncio.wait_for(handler(payload), timeout=50)

                        self.queue.delete_message(message["ReceiptHandle"])
                        print(f"Processed order {payload.order_id}")
                    except Exception as e:
                        print(f"Error processing message {message['MessageId']}: {e}")
                        # Don't delete; message will reappear after visibility timeout

            except Exception as e:
                print(f"Error in consumer loop: {e}")
                await asyncio.sleep(5)

    def stop(self) -> None:
        self.is_running = False
// SQSMessageQueue.cs
using Amazon.SQS;
using Amazon.SQS.Model;
using System.Text.Json;

public record OrderPayload(string OrderId, double Amount);

public class SQSMessageQueue
{
    private readonly AmazonSQSClient _client;
    private readonly string _queueUrl;

    public SQSMessageQueue(string region, string queueUrl)
    {
        _client = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(region));
        _queueUrl = queueUrl;
    }

    public async Task<string> SendMessageAsync(OrderPayload payload, int delaySeconds = 0)
    {
        var response = await _client.SendMessageAsync(new SendMessageRequest
        {
            QueueUrl = _queueUrl,
            MessageBody = JsonSerializer.Serialize(payload),
            DelaySeconds = delaySeconds,
            MessageAttributes = new Dictionary<string, MessageAttributeValue>
            {
                ["OrderId"] = new() { StringValue = payload.OrderId, DataType = "String" },
                ["Timestamp"] = new() { StringValue = DateTime.UtcNow.ToString("O"), DataType = "String" },
            }
        });
        return response.MessageId;
    }

    public async Task<List<Message>> ReceiveMessagesAsync(int maxMessages = 10, int visibilityTimeout = 30)
    {
        var response = await _client.ReceiveMessageAsync(new ReceiveMessageRequest
        {
            QueueUrl = _queueUrl,
            MaxNumberOfMessages = maxMessages,
            VisibilityTimeout = visibilityTimeout,
            WaitTimeSeconds = 20,
            MessageAttributeNames = new List<string> { "All" },
        });
        return response.Messages;
    }

    public async Task DeleteMessageAsync(string receiptHandle) =>
        await _client.DeleteMessageAsync(_queueUrl, receiptHandle);
}

public class SQSConsumer
{
    private readonly SQSMessageQueue _queue;
    private bool _isRunning;

    public SQSConsumer(SQSMessageQueue queue) { _queue = queue; }

    public async Task StartAsync(Func<OrderPayload, Task> handler)
    {
        _isRunning = true;

        while (_isRunning)
        {
            try
            {
                var messages = await _queue.ReceiveMessagesAsync(10, 60);

                foreach (var message in messages)
                {
                    try
                    {
                        var payload = JsonSerializer.Deserialize<OrderPayload>(message.Body)!;

                        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(50));
                        await handler(payload).WaitAsync(cts.Token);

                        await _queue.DeleteMessageAsync(message.ReceiptHandle);
                        Console.WriteLine($"Processed order {payload.OrderId}");
                    }
                    catch (Exception e)
                    {
                        Console.Error.WriteLine($"Error processing message {message.MessageId}: {e.Message}");
                        // Don't delete; message will reappear after visibility timeout
                    }
                }
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"Error in consumer loop: {e.Message}");
                await Task.Delay(5000);
            }
        }
    }

    public void Stop() => _isRunning = false;
}

Message Patterns

Work Queue Pattern

Consumers หลายคนประมวลผลงานจากคิวเดียวในแบบขนาน

// Producer
const queue = new SQSMessageQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789/task-queue');

for (let i = 0; i < 100; i++) {
  await queue.sendMessage({
    orderId: `TASK-${i}`,
    amount: Math.random() * 1000,
  });
}

// Multiple consumers
const consumer = new SQSConsumer(queue);

const handler = async (payload: OrderPayload) => {
  console.log(`Processing ${payload.orderId}`);
  await new Promise((resolve) => setTimeout(resolve, Math.random() * 5000));
};

// Start 5 parallel consumers
Promise.all([...Array(5)].map(() => consumer.start(handler)));
// Work Queue Pattern - Producer
SQSMessageQueue queue = new SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue");

for (int i = 0; i < 100; i++) {
    queue.sendMessage(new SQSMessageQueue.OrderPayload("TASK-" + i, Math.random() * 1000), 0);
}

// Work Queue Pattern - Multiple consumers via thread pool
SQSConsumer consumer = new SQSConsumer(queue);

java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler = payload -> {
    System.out.println("Processing " + payload.orderId());
    try { Thread.sleep((long)(Math.random() * 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
};

// Start 5 parallel consumers
java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    executor.submit(() -> consumer.start(handler));
}
# Work Queue Pattern - Producer
import asyncio
import random

queue = SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue")

for i in range(100):
    queue.send_message(OrderPayload(order_id=f"TASK-{i}", amount=random.random() * 1000))

# Multiple consumers
consumer = SQSConsumer(queue)

async def handler(payload: OrderPayload):
    print(f"Processing {payload.order_id}")
    await asyncio.sleep(random.random() * 5)

# Start 5 parallel consumers
async def main():
    await asyncio.gather(*[consumer.start(handler) for _ in range(5)])

asyncio.run(main())
// Work Queue Pattern - Producer
var queue = new SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue");

for (int i = 0; i < 100; i++)
    await queue.SendMessageAsync(new OrderPayload($"TASK-{i}", Random.Shared.NextDouble() * 1000));

// Multiple consumers
var consumer = new SQSConsumer(queue);

Func<OrderPayload, Task> handler = async payload =>
{
    Console.WriteLine($"Processing {payload.OrderId}");
    await Task.Delay((int)(Random.Shared.NextDouble() * 5000));
};

// Start 5 parallel consumers
await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => consumer.StartAsync(handler)));

Pub/Sub Fanout Pattern

Exchange หนึ่งกำหนดเส้นทางข้อความไปยังคิวหลายคิว แต่ละคิวถูกบริโภคโดยบริการที่แตกต่างกัน

// In RabbitMQ, create fanout exchange
async function setupFanout(channel: amqp.Channel) {
  const exchangeName = 'user-events-fanout';
  
  // Fanout: broadcast to all bound queues
  await channel.assertExchange(exchangeName, 'fanout', { durable: true });

  // Email service queue
  await channel.assertQueue('email-queue', { durable: true });
  await channel.bindQueue('email-queue', exchangeName, '');

  // Analytics queue
  await channel.assertQueue('analytics-queue', { durable: true });
  await channel.bindQueue('analytics-queue', exchangeName, '');

  // Notification queue
  await channel.assertQueue('notification-queue', { durable: true });
  await channel.bindQueue('notification-queue', exchangeName, '');

  return exchangeName;
}

// Single user registration message triggers three consumers
const exchangeName = await setupFanout(channel);
await channel.publish(
  exchangeName,
  '', // Fanout ignores routing key
  Buffer.from(JSON.stringify({ userId: 'user-123', email: 'user@example.com' })),
  { persistent: true }
);
// Pub/Sub Fanout Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;

public String setupFanout(Channel channel) throws Exception {
    String exchangeName = "user-events-fanout";

    // Fanout: broadcast to all bound queues
    channel.exchangeDeclare(exchangeName, "fanout", true);

    channel.queueDeclare("email-queue", true, false, false, null);
    channel.queueBind("email-queue", exchangeName, "");

    channel.queueDeclare("analytics-queue", true, false, false, null);
    channel.queueBind("analytics-queue", exchangeName, "");

    channel.queueDeclare("notification-queue", true, false, false, null);
    channel.queueBind("notification-queue", exchangeName, "");

    return exchangeName;
}

// Publish fanout message
String exchangeName = setupFanout(channel);
byte[] body = new ObjectMapper().writeValueAsBytes(
    java.util.Map.of("userId", "user-123", "email", "user@example.com")
);
com.rabbitmq.client.AMQP.BasicProperties props =
    new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish(exchangeName, "", props, body); // Fanout ignores routing key
# Pub/Sub Fanout Pattern
import aio_pika
import json


async def setup_fanout(channel: aio_pika.Channel) -> aio_pika.Exchange:
    exchange_name = "user-events-fanout"

    # Fanout: broadcast to all bound queues
    exchange = await channel.declare_exchange(
        exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
    )

    for queue_name in ("email-queue", "analytics-queue", "notification-queue"):
        queue = await channel.declare_queue(queue_name, durable=True)
        await queue.bind(exchange, routing_key="")  # Fanout ignores routing key

    return exchange


# Publish fanout message
exchange = await setup_fanout(channel)
await exchange.publish(
    aio_pika.Message(
        body=json.dumps({"user_id": "user-123", "email": "user@example.com"}).encode(),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
    ),
    routing_key="",  # Fanout ignores routing key
)
// Pub/Sub Fanout Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

static string SetupFanout(IModel channel)
{
    const string exchangeName = "user-events-fanout";

    // Fanout: broadcast to all bound queues
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, durable: true);

    foreach (var queueName in new[] { "email-queue", "analytics-queue", "notification-queue" })
    {
        channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(queueName, exchangeName, routingKey: ""); // Fanout ignores routing key
    }

    return exchangeName;
}

// Publish fanout message
var exchangeName = SetupFanout(channel);
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { userId = "user-123", email = "user@example.com" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchangeName, routingKey: "", props, body); // Fanout ignores routing key

Topic-Based Routing Pattern

กำหนดเส้นทางข้อความโดยยึดตามการกำหนดเส้นทางแบบลำดับชั้น (เช่น orders.processing.urgent, orders.completed.standard)

async function setupTopicRouting(channel: amqp.Channel) {
  const exchangeName = 'orders-topic';
  
  await channel.assertExchange(exchangeName, 'topic', { durable: true });

  // Queue for all order updates
  await channel.assertQueue('all-orders-queue', { durable: true });
  await channel.bindQueue('all-orders-queue', exchangeName, 'orders.#'); // All events

  // Queue for urgent orders only
  await channel.assertQueue('urgent-orders-queue', { durable: true });
  await channel.bindQueue('urgent-orders-queue', exchangeName, 'orders.*.urgent');

  // Queue for processing orders
  await channel.assertQueue('processing-queue', { durable: true });
  await channel.bindQueue('processing-queue', exchangeName, 'orders.processing.*');

  return exchangeName;
}

// Publish with topic routing
await channel.publish(
  'orders-topic',
  'orders.processing.urgent',
  Buffer.from(JSON.stringify({ orderId: '12345', priority: 'urgent' })),
  { persistent: true }
);

// 'urgent-orders-queue' and 'processing-queue' both receive this message
// 'all-orders-queue' also receives it
// Topic-Based Routing Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;

public String setupTopicRouting(Channel channel) throws Exception {
    String exchangeName = "orders-topic";

    channel.exchangeDeclare(exchangeName, "topic", true);

    channel.queueDeclare("all-orders-queue", true, false, false, null);
    channel.queueBind("all-orders-queue", exchangeName, "orders.#"); // All events

    channel.queueDeclare("urgent-orders-queue", true, false, false, null);
    channel.queueBind("urgent-orders-queue", exchangeName, "orders.*.urgent");

    channel.queueDeclare("processing-queue", true, false, false, null);
    channel.queueBind("processing-queue", exchangeName, "orders.processing.*");

    return exchangeName;
}

// Publish with topic routing
byte[] body = new ObjectMapper().writeValueAsBytes(
    java.util.Map.of("orderId", "12345", "priority", "urgent")
);
com.rabbitmq.client.AMQP.BasicProperties props =
    new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives it
# Topic-Based Routing Pattern
import aio_pika
import json


async def setup_topic_routing(channel: aio_pika.Channel) -> aio_pika.Exchange:
    exchange_name = "orders-topic"

    exchange = await channel.declare_exchange(
        exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
    )

    bindings = [
        ("all-orders-queue", "orders.#"),       # All events
        ("urgent-orders-queue", "orders.*.urgent"),
        ("processing-queue", "orders.processing.*"),
    ]

    for queue_name, routing_key in bindings:
        queue = await channel.declare_queue(queue_name, durable=True)
        await queue.bind(exchange, routing_key=routing_key)

    return exchange


# Publish with topic routing
exchange = await setup_topic_routing(channel)
await exchange.publish(
    aio_pika.Message(
        body=json.dumps({"order_id": "12345", "priority": "urgent"}).encode(),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
    ),
    routing_key="orders.processing.urgent",
)
# 'urgent-orders-queue' and 'processing-queue' both receive this
# 'all-orders-queue' also receives it
// Topic-Based Routing Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

static string SetupTopicRouting(IModel channel)
{
    const string exchangeName = "orders-topic";

    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, durable: true);

    var bindings = new[]
    {
        ("all-orders-queue",      "orders.#"),           // All events
        ("urgent-orders-queue",   "orders.*.urgent"),
        ("processing-queue",      "orders.processing.*"),
    };

    foreach (var (queueName, routingKey) in bindings)
    {
        channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(queueName, exchangeName, routingKey);
    }

    return exchangeName;
}

// Publish with topic routing
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { orderId = "12345", priority = "urgent" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives it

Dead-Letter Queues and Retry Strategies

Dead-letter queues (DLQs) จับข้อความที่ล้มเหลวในการประมวลผลหลังจากการลองใหม่สูงสุด

interface RetryConfig {
  maxRetries: number;
  initialDelayMs: number;
  maxDelayMs: number;
  backoffMultiplier: number;
}

class MessageProcessor {
  private config: RetryConfig;

  constructor(config: RetryConfig) {
    this.config = config;
  }

  async setupRabbitMQDLQ(channel: amqp.Channel): Promise<void> {
    const mainExchange = 'orders';
    const dlxExchange = `${mainExchange}-dlx`;
    const dlqName = 'orders-dlq';

    // Dead-letter exchange
    await channel.assertExchange(dlxExchange, 'direct', { durable: true });

    // Dead-letter queue
    await channel.assertQueue(dlqName, { durable: true });
    await channel.bindQueue(dlqName, dlxExchange, 'orders');

    // Main queue with DLX configured
    await channel.assertQueue('orders-queue', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': dlxExchange,
        'x-dead-letter-routing-key': 'orders',
        'x-max-length': 1000000, // Prevent unbounded growth
      },
    });
  }

  calculateBackoffDelay(retryCount: number): number {
    const exponentialDelay = this.config.initialDelayMs * 
      Math.pow(this.config.backoffMultiplier, retryCount);
    
    return Math.min(exponentialDelay, this.config.maxDelayMs);
  }

  async processWithRetry<T>(
    handler: (data: T) => Promise<void>,
    data: T,
    retryCount: number = 0
  ): Promise<void> {
    try {
      await handler(data);
    } catch (error) {
      if (retryCount < this.config.maxRetries) {
        const delayMs = this.calculateBackoffDelay(retryCount);
        console.log(
          `Retry ${retryCount + 1}/${this.config.maxRetries} after ${delayMs}ms`
        );
        
        await new Promise((resolve) => setTimeout(resolve, delayMs));
        await this.processWithRetry(handler, data, retryCount + 1);
      } else {
        console.error(
          `Failed after ${this.config.maxRetries} retries. Moving to DLQ.`
        );
        throw error;
      }
    }
  }
}

// Usage
const processor = new MessageProcessor({
  maxRetries: 3,
  initialDelayMs: 1000,
  maxDelayMs: 30000,
  backoffMultiplier: 2,
});

await processor.processWithRetry(
  async (order: OrderMessage) => {
    // Process logic
    if (Math.random() < 0.3) throw new Error('Temporary failure');
    console.log('Order processed:', order.orderId);
  },
  { orderId: 'ORD-123', customerId: 'CUST-456', total: 99.99, timestamp: new Date() }
);
// MessageProcessor.java
import com.rabbitmq.client.Channel;
import java.util.Map;
import java.util.function.Consumer;

public class MessageProcessor {

    public record RetryConfig(int maxRetries, long initialDelayMs, long maxDelayMs, double backoffMultiplier) {}

    private final RetryConfig config;

    public MessageProcessor(RetryConfig config) { this.config = config; }

    public void setupRabbitMQDLQ(Channel channel) throws Exception {
        String mainExchange = "orders";
        String dlxExchange = mainExchange + "-dlx";
        String dlqName = "orders-dlq";

        channel.exchangeDeclare(dlxExchange, "direct", true);
        channel.queueDeclare(dlqName, true, false, false, null);
        channel.queueBind(dlqName, dlxExchange, "orders");

        channel.queueDeclare("orders-queue", true, false, false, Map.of(
            "x-dead-letter-exchange", dlxExchange,
            "x-dead-letter-routing-key", "orders",
            "x-max-length", 1_000_000
        ));
    }

    public long calculateBackoffDelay(int retryCount) {
        long exponentialDelay = (long)(config.initialDelayMs() *
            Math.pow(config.backoffMultiplier(), retryCount));
        return Math.min(exponentialDelay, config.maxDelayMs());
    }

    public <T> void processWithRetry(Consumer<T> handler, T data, int retryCount) throws Exception {
        try {
            handler.accept(data);
        } catch (Exception e) {
            if (retryCount < config.maxRetries()) {
                long delayMs = calculateBackoffDelay(retryCount);
                System.out.printf("Retry %d/%d after %dms%n", retryCount + 1, config.maxRetries(), delayMs);
                Thread.sleep(delayMs);
                processWithRetry(handler, data, retryCount + 1);
            } else {
                System.err.printf("Failed after %d retries. Moving to DLQ.%n", config.maxRetries());
                throw e;
            }
        }
    }

    // Usage
    public static void main(String[] args) throws Exception {
        MessageProcessor processor = new MessageProcessor(
            new RetryConfig(3, 1000, 30000, 2.0)
        );

        processor.processWithRetry(order -> {
            if (Math.random() < 0.3) throw new RuntimeException("Temporary failure");
            System.out.println("Order processed: " + order);
        }, "ORD-123", 0);
    }
}
# message_processor.py
import asyncio
import random
from dataclasses import dataclass
from typing import Callable, TypeVar, Awaitable

import aio_pika

T = TypeVar("T")


@dataclass
class RetryConfig:
    max_retries: int
    initial_delay_ms: int
    max_delay_ms: int
    backoff_multiplier: float


class MessageProcessor:
    def __init__(self, config: RetryConfig):
        self.config = config

    async def setup_rabbitmq_dlq(self, channel: aio_pika.Channel) -> None:
        main_exchange = "orders"
        dlx_exchange = f"{main_exchange}-dlx"
        dlq_name = "orders-dlq"

        dead_exchange = await channel.declare_exchange(dlx_exchange, aio_pika.ExchangeType.DIRECT, durable=True)
        dlq = await channel.declare_queue(dlq_name, durable=True)
        await dlq.bind(dead_exchange, routing_key="orders")

        await channel.declare_queue("orders-queue", durable=True, arguments={
            "x-dead-letter-exchange": dlx_exchange,
            "x-dead-letter-routing-key": "orders",
            "x-max-length": 1_000_000,
        })

    def calculate_backoff_delay(self, retry_count: int) -> float:
        delay = self.config.initial_delay_ms * (self.config.backoff_multiplier ** retry_count)
        return min(delay, self.config.max_delay_ms) / 1000  # return seconds

    async def process_with_retry(
        self,
        handler: Callable[[T], Awaitable[None]],
        data: T,
        retry_count: int = 0,
    ) -> None:
        try:
            await handler(data)
        except Exception as e:
            if retry_count < self.config.max_retries:
                delay = self.calculate_backoff_delay(retry_count)
                print(f"Retry {retry_count + 1}/{self.config.max_retries} after {delay*1000:.0f}ms")
                await asyncio.sleep(delay)
                await self.process_with_retry(handler, data, retry_count + 1)
            else:
                print(f"Failed after {self.config.max_retries} retries. Moving to DLQ.")
                raise


# Usage
async def main():
    processor = MessageProcessor(RetryConfig(
        max_retries=3, initial_delay_ms=1000, max_delay_ms=30000, backoff_multiplier=2.0
    ))

    async def handle_order(order: dict):
        if random.random() < 0.3:
            raise ValueError("Temporary failure")
        print(f"Order processed: {order['order_id']}")

    await processor.process_with_retry(
        handle_order,
        {"order_id": "ORD-123", "customer_id": "CUST-456", "total": 99.99}
    )

asyncio.run(main())
// MessageProcessor.cs
using RabbitMQ.Client;

public record RetryConfig(int MaxRetries, int InitialDelayMs, int MaxDelayMs, double BackoffMultiplier);

public class MessageProcessor
{
    private readonly RetryConfig _config;

    public MessageProcessor(RetryConfig config) { _config = config; }

    public void SetupRabbitMQDLQ(IModel channel)
    {
        const string mainExchange = "orders";
        var dlxExchange = $"{mainExchange}-dlx";
        const string dlqName = "orders-dlq";

        channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, durable: true);
        channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(dlqName, dlxExchange, "orders");

        channel.QueueDeclare("orders-queue", durable: true, exclusive: false, autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-dead-letter-exchange"] = dlxExchange,
                ["x-dead-letter-routing-key"] = "orders",
                ["x-max-length"] = 1_000_000,
            });
    }

    public int CalculateBackoffDelay(int retryCount)
    {
        var exponentialDelay = (int)(_config.InitialDelayMs * Math.Pow(_config.BackoffMultiplier, retryCount));
        return Math.Min(exponentialDelay, _config.MaxDelayMs);
    }

    public async Task ProcessWithRetryAsync<T>(Func<T, Task> handler, T data, int retryCount = 0)
    {
        try
        {
            await handler(data);
        }
        catch (Exception) when (retryCount < _config.MaxRetries)
        {
            int delayMs = CalculateBackoffDelay(retryCount);
            Console.WriteLine($"Retry {retryCount + 1}/{_config.MaxRetries} after {delayMs}ms");
            await Task.Delay(delayMs);
            await ProcessWithRetryAsync(handler, data, retryCount + 1);
        }
        catch
        {
            Console.Error.WriteLine($"Failed after {_config.MaxRetries} retries. Moving to DLQ.");
            throw;
        }
    }
}

// Usage
var processor = new MessageProcessor(new RetryConfig(3, 1000, 30000, 2.0));

await processor.ProcessWithRetryAsync(async (order) =>
{
    if (Random.Shared.NextDouble() < 0.3) throw new Exception("Temporary failure");
    Console.WriteLine($"Order processed: {order}");
}, new { orderId = "ORD-123", customerId = "CUST-456", total = 99.99 });

Idempotency: Handling Duplicate Messages

Idempotent consumers สามารถประมวลผลข้อความเดียวกันหลายครั้งได้อย่างปลอดภัยโดยไม่มีผลข้างเคียง

interface IdempotencyStore {
  isProcessed(messageId: string): Promise<boolean>;
  markProcessed(messageId: string, expiryMs: number): Promise<void>;
}

class RedisIdempotencyStore implements IdempotencyStore {
  constructor(private redisClient: any) {}

  async isProcessed(messageId: string): Promise<boolean> {
    const key = `processed:${messageId}`;
    const result = await this.redisClient.get(key);
    return result !== null;
  }

  async markProcessed(messageId: string, expiryMs: number = 86400000): Promise<void> {
    const key = `processed:${messageId}`;
    await this.redisClient.setex(key, Math.ceil(expiryMs / 1000), '1');
  }
}

class IdempotentConsumer {
  constructor(
    private queue: RabbitMQConsumer,
    private idempotencyStore: IdempotencyStore
  ) {}

  async consume(
    handler: (message: OrderMessage) => Promise<void>
  ): Promise<void> {
    await this.queue.consume(async (message: OrderMessage) => {
      // Check idempotency
      const alreadyProcessed = await this.idempotencyStore.isProcessed(
        message.orderId
      );

      if (alreadyProcessed) {
        console.log(`Message ${message.orderId} already processed. Skipping.`);
        return; // Silently skip
      }

      // Process
      await handler(message);

      // Mark as processed
      await this.idempotencyStore.markProcessed(message.orderId);
    });
  }
}
// IdempotencyStore.java
import redis.clients.jedis.Jedis;

public interface IdempotencyStore {
    boolean isProcessed(String messageId);
    void markProcessed(String messageId, long expiryMs);
}

public class RedisIdempotencyStore implements IdempotencyStore {
    private final Jedis redis;

    public RedisIdempotencyStore(Jedis redis) { this.redis = redis; }

    @Override
    public boolean isProcessed(String messageId) {
        return redis.get("processed:" + messageId) != null;
    }

    @Override
    public void markProcessed(String messageId, long expiryMs) {
        redis.setex("processed:" + messageId, (int) Math.ceil(expiryMs / 1000.0), "1");
    }
}

public class IdempotentConsumer {
    private final RabbitMQConsumer queue;
    private final IdempotencyStore idempotencyStore;

    public IdempotentConsumer(RabbitMQConsumer queue, IdempotencyStore store) {
        this.queue = queue;
        this.idempotencyStore = store;
    }

    public void consume(java.util.function.Consumer<java.util.Map<String, Object>> handler) throws Exception {
        queue.consume(message -> {
            String orderId = (String) message.get("orderId");

            if (idempotencyStore.isProcessed(orderId)) {
                System.out.println("Message " + orderId + " already processed. Skipping.");
                return;
            }

            handler.accept(message);
            idempotencyStore.markProcessed(orderId, 86_400_000L);
        });
    }
}
# idempotency.py
import redis.asyncio as aioredis
from typing import Protocol, runtime_checkable


@runtime_checkable
class IdempotencyStore(Protocol):
    async def is_processed(self, message_id: str) -> bool: ...
    async def mark_processed(self, message_id: str, expiry_ms: int) -> None: ...


class RedisIdempotencyStore:
    def __init__(self, redis_client: aioredis.Redis):
        self.redis = redis_client

    async def is_processed(self, message_id: str) -> bool:
        key = f"processed:{message_id}"
        return await self.redis.get(key) is not None

    async def mark_processed(self, message_id: str, expiry_ms: int = 86_400_000) -> None:
        key = f"processed:{message_id}"
        await self.redis.setex(key, expiry_ms // 1000, "1")


class IdempotentConsumer:
    def __init__(self, queue: RabbitMQConsumer, store: IdempotencyStore):
        self.queue = queue
        self.store = store

    async def consume(self, handler):
        async def idempotent_handler(message: dict):
            order_id = message.get("order_id") or message.get("orderId")

            if await self.store.is_processed(order_id):
                print(f"Message {order_id} already processed. Skipping.")
                return

            await handler(message)
            await self.store.mark_processed(order_id)

        await self.queue.consume(idempotent_handler)
// Idempotency.cs
using StackExchange.Redis;

public interface IIdempotencyStore
{
    Task<bool> IsProcessedAsync(string messageId);
    Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null);
}

public class RedisIdempotencyStore : IIdempotencyStore
{
    private readonly IDatabase _redis;

    public RedisIdempotencyStore(IConnectionMultiplexer multiplexer)
    {
        _redis = multiplexer.GetDatabase();
    }

    public async Task<bool> IsProcessedAsync(string messageId)
    {
        return await _redis.KeyExistsAsync($"processed:{messageId}");
    }

    public async Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null)
    {
        await _redis.StringSetAsync(
            $"processed:{messageId}", "1",
            expiry ?? TimeSpan.FromDays(1));
    }
}

public class IdempotentConsumer
{
    private readonly RabbitMQConsumer _queue;
    private readonly IIdempotencyStore _store;

    public IdempotentConsumer(RabbitMQConsumer queue, IIdempotencyStore store)
    {
        _queue = queue;
        _store = store;
    }

    public void Consume(Func<Dictionary<string, object>, Task> handler)
    {
        _queue.Consume(async message =>
        {
            var orderId = message.GetValueOrDefault("orderId")?.ToString() ?? string.Empty;

            if (await _store.IsProcessedAsync(orderId))
            {
                Console.WriteLine($"Message {orderId} already processed. Skipping.");
                return;
            }

            await handler(message);
            await _store.MarkProcessedAsync(orderId);
        });
    }
}

Backpressure and Rate Control

ป้องกัน producers จากการบดขยี้ consumers

interface BackpressureConfig {
  prefetchCount: number;
  highWaterMark: number;
  lowWaterMark: number;
}

class BackpressureController {
  private isPaused: boolean = false;

  async handleBackpressure(
    channel: amqp.Channel,
    queueName: string,
    config: BackpressureConfig
  ): Promise<void> {
    // Set prefetch (QoS) to limit in-flight messages
    await channel.prefetch(config.prefetchCount);

    // Monitor queue depth
    const checkQueueDepth = async () => {
      const queueInfo = await channel.checkQueue(queueName);
      const depth = queueInfo.messageCount;

      if (depth > config.highWaterMark && !this.isPaused) {
        console.log(
          `High backpressure: queue depth ${depth} > ${config.highWaterMark}`
        );
        this.isPaused = true;
        // Pause processing or reduce producer throughput
      } else if (depth < config.lowWaterMark && this.isPaused) {
        console.log(
          `Backpressure relieved: queue depth ${depth} < ${config.lowWaterMark}`
        );
        this.isPaused = false;
        // Resume processing
      }
    };

    setInterval(checkQueueDepth, 5000);
  }
}

// Rate limiting producer
class RateLimitedProducer {
  private tokens: number;
  private lastRefillTime: number;

  constructor(
    private tokensPerSecond: number,
    private maxBurst: number
  ) {
    this.tokens = maxBurst;
    this.lastRefillTime = Date.now();
  }

  async acquire(): Promise<void> {
    const now = Date.now();
    const timePassed = (now - this.lastRefillTime) / 1000;
    
    this.tokens = Math.min(
      this.maxBurst,
      this.tokens + timePassed * this.tokensPerSecond
    );
    this.lastRefillTime = now;

    if (this.tokens < 1) {
      const waitTime = (1 - this.tokens) / this.tokensPerSecond * 1000;
      await new Promise((resolve) => setTimeout(resolve, waitTime));
      return this.acquire();
    }

    this.tokens -= 1;
  }

  async publishWithRateLimit(
    publisher: (msg: any) => Promise<void>,
    message: any
  ): Promise<void> {
    await this.acquire();
    await publisher(message);
  }
}
// BackpressureController.java
import com.rabbitmq.client.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class BackpressureController {
    private final AtomicBoolean isPaused = new AtomicBoolean(false);

    public void handleBackpressure(Channel channel, String queueName,
                                    int prefetchCount, int highWaterMark, int lowWaterMark) throws Exception {
        channel.basicQos(prefetchCount);

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {
            try {
                long depth = channel.messageCount(queueName);
                if (depth > highWaterMark && !isPaused.get()) {
                    System.out.printf("High backpressure: queue depth %d > %d%n", depth, highWaterMark);
                    isPaused.set(true);
                } else if (depth < lowWaterMark && isPaused.get()) {
                    System.out.printf("Backpressure relieved: queue depth %d < %d%n", depth, lowWaterMark);
                    isPaused.set(false);
                }
            } catch (Exception e) { e.printStackTrace(); }
        }, 0, 5, TimeUnit.SECONDS);
    }
}

// Token bucket rate limiter
public class RateLimitedProducer {
    private final double tokensPerSecond;
    private final double maxBurst;
    private double tokens;
    private long lastRefillTime;

    public RateLimitedProducer(double tokensPerSecond, double maxBurst) {
        this.tokensPerSecond = tokensPerSecond;
        this.maxBurst = maxBurst;
        this.tokens = maxBurst;
        this.lastRefillTime = System.currentTimeMillis();
    }

    public synchronized void acquire() throws InterruptedException {
        long now = System.currentTimeMillis();
        double timePassed = (now - lastRefillTime) / 1000.0;
        tokens = Math.min(maxBurst, tokens + timePassed * tokensPerSecond);
        lastRefillTime = now;

        if (tokens < 1) {
            long waitTime = (long)((1 - tokens) / tokensPerSecond * 1000);
            Thread.sleep(waitTime);
            acquire();
            return;
        }
        tokens -= 1;
    }

    public void publishWithRateLimit(java.util.function.Consumer<Object> publisher, Object message)
            throws InterruptedException {
        acquire();
        publisher.accept(message);
    }
}
# backpressure.py
import asyncio
import time
import aio_pika
from dataclasses import dataclass


@dataclass
class BackpressureConfig:
    prefetch_count: int
    high_water_mark: int
    low_water_mark: int


class BackpressureController:
    def __init__(self):
        self._is_paused = False

    async def handle_backpressure(
        self, channel: aio_pika.Channel, queue_name: str, config: BackpressureConfig
    ) -> None:
        await channel.set_qos(prefetch_count=config.prefetch_count)

        async def check_queue_depth():
            while True:
                await asyncio.sleep(5)
                try:
                    queue = await channel.get_queue(queue_name, ensure=False)
                    depth = queue.declaration_result.message_count

                    if depth > config.high_water_mark and not self._is_paused:
                        print(f"High backpressure: queue depth {depth} > {config.high_water_mark}")
                        self._is_paused = True
                    elif depth < config.low_water_mark and self._is_paused:
                        print(f"Backpressure relieved: queue depth {depth} < {config.low_water_mark}")
                        self._is_paused = False
                except Exception as e:
                    print(f"Error checking queue depth: {e}")

        asyncio.create_task(check_queue_depth())


class RateLimitedProducer:
    def __init__(self, tokens_per_second: float, max_burst: float):
        self.tokens_per_second = tokens_per_second
        self.max_burst = max_burst
        self.tokens = max_burst
        self.last_refill_time = time.monotonic()

    async def acquire(self) -> None:
        now = time.monotonic()
        time_passed = now - self.last_refill_time
        self.tokens = min(self.max_burst, self.tokens + time_passed * self.tokens_per_second)
        self.last_refill_time = now

        if self.tokens < 1:
            wait_time = (1 - self.tokens) / self.tokens_per_second
            await asyncio.sleep(wait_time)
            return await self.acquire()

        self.tokens -= 1

    async def publish_with_rate_limit(self, publisher, message) -> None:
        await self.acquire()
        await publisher(message)
// BackpressureController.cs
using RabbitMQ.Client;

public record BackpressureConfig(int PrefetchCount, int HighWaterMark, int LowWaterMark);

public class BackpressureController
{
    private bool _isPaused;

    public async Task HandleBackpressureAsync(IModel channel, string queueName, BackpressureConfig config)
    {
        channel.BasicQos(0, (ushort)config.PrefetchCount, false);

        _ = Task.Run(async () =>
        {
            while (true)
            {
                await Task.Delay(5000);
                try
                {
                    var queueInfo = channel.QueueDeclarePassive(queueName);
                    var depth = queueInfo.MessageCount;

                    if (depth > config.HighWaterMark && !_isPaused)
                    {
                        Console.WriteLine($"High backpressure: queue depth {depth} > {config.HighWaterMark}");
                        _isPaused = true;
                    }
                    else if (depth < config.LowWaterMark && _isPaused)
                    {
                        Console.WriteLine($"Backpressure relieved: queue depth {depth} < {config.LowWaterMark}");
                        _isPaused = false;
                    }
                }
                catch (Exception e) { Console.Error.WriteLine($"Error checking queue: {e.Message}"); }
            }
        });
    }
}

public class RateLimitedProducer
{
    private readonly double _tokensPerSecond;
    private readonly double _maxBurst;
    private double _tokens;
    private DateTimeOffset _lastRefillTime;
    private readonly SemaphoreSlim _lock = new(1, 1);

    public RateLimitedProducer(double tokensPerSecond, double maxBurst)
    {
        _tokensPerSecond = tokensPerSecond;
        _maxBurst = maxBurst;
        _tokens = maxBurst;
        _lastRefillTime = DateTimeOffset.UtcNow;
    }

    public async Task AcquireAsync()
    {
        await _lock.WaitAsync();
        try
        {
            var now = DateTimeOffset.UtcNow;
            var timePassed = (now - _lastRefillTime).TotalSeconds;
            _tokens = Math.Min(_maxBurst, _tokens + timePassed * _tokensPerSecond);
            _lastRefillTime = now;

            if (_tokens < 1)
            {
                var waitMs = (int)((1 - _tokens) / _tokensPerSecond * 1000);
                _lock.Release();
                await Task.Delay(waitMs);
                await AcquireAsync();
                return;
            }
            _tokens -= 1;
        }
        finally { if (_lock.CurrentCount == 0) _lock.Release(); }
    }

    public async Task PublishWithRateLimitAsync(Func<object, Task> publisher, object message)
    {
        await AcquireAsync();
        await publisher(message);
    }
}

Monitoring: Queue Depth, Consumer Lag, and Processing Time

interface QueueMetrics {
  queueDepth: number;
  consumerLag: number;
  avgProcessingTime: number;
  errorRate: number;
  throughput: number; // messages/second
}

class QueueMonitor {
  private metrics: Map<string, QueueMetrics> = new Map();
  private processingTimes: number[] = [];
  private errorCount: number = 0;
  private successCount: number = 0;
  private lastReportTime: number = Date.now();

  recordProcessingTime(durationMs: number): void {
    this.processingTimes.push(durationMs);
    this.successCount++;

    // Keep only last 1000 measurements
    if (this.processingTimes.length > 1000) {
      this.processingTimes.shift();
    }
  }

  recordError(): void {
    this.errorCount++;
  }

  async getMetrics(
    channel: amqp.Channel,
    queueName: string
  ): Promise<QueueMetrics> {
    const queueInfo = await channel.checkQueue(queueName);
    const now = Date.now();
    const timeSinceLastReport = (now - this.lastReportTime) / 1000;

    const avgProcessingTime =
      this.processingTimes.length > 0
        ? this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length
        : 0;

    const throughput =
      timeSinceLastReport > 0
        ? this.successCount / timeSinceLastReport
        : 0;

    const errorRate =
      this.successCount + this.errorCount > 0
        ? this.errorCount / (this.successCount + this.errorCount)
        : 0;

    const metrics: QueueMetrics = {
      queueDepth: queueInfo.messageCount,
      consumerLag: queueInfo.messageCount * (avgProcessingTime / 1000),
      avgProcessingTime,
      errorRate,
      throughput,
    };

    // Reset counters for next period
    this.lastReportTime = now;
    this.successCount = 0;
    this.errorCount = 0;

    return metrics;
  }

  async reportMetrics(
    channel: amqp.Channel,
    queueName: string
  ): Promise<void> {
    const metrics = await this.getMetrics(channel, queueName);

    console.log(`Queue: ${queueName}`);
    console.log(`  Depth: ${metrics.queueDepth} messages`);
    console.log(`  Lag: ${metrics.consumerLag.toFixed(2)} seconds`);
    console.log(`  Avg Processing Time: ${metrics.avgProcessingTime.toFixed(2)}ms`);
    console.log(`  Throughput: ${metrics.throughput.toFixed(2)} msg/s`);
    console.log(`  Error Rate: ${(metrics.errorRate * 100).toFixed(2)}%`);
  }
}
// QueueMonitor.java
import com.rabbitmq.client.Channel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLong;

public class QueueMonitor {

    public record QueueMetrics(long queueDepth, double consumerLag, double avgProcessingTime,
                                double errorRate, double throughput) {}

    private final Deque<Long> processingTimes = new ArrayDeque<>();
    private final AtomicLong errorCount = new AtomicLong(0);
    private final AtomicLong successCount = new AtomicLong(0);
    private long lastReportTime = System.currentTimeMillis();

    public void recordProcessingTime(long durationMs) {
        processingTimes.addLast(durationMs);
        successCount.incrementAndGet();
        while (processingTimes.size() > 1000) processingTimes.pollFirst();
    }

    public void recordError() { errorCount.incrementAndGet(); }

    public QueueMetrics getMetrics(Channel channel, String queueName) throws Exception {
        long queueDepth = channel.messageCount(queueName);
        long now = System.currentTimeMillis();
        double timeSinceLastReport = (now - lastReportTime) / 1000.0;

        double avgProcessingTime = processingTimes.isEmpty() ? 0
            : processingTimes.stream().mapToLong(Long::longValue).average().orElse(0);

        long sc = successCount.get(), ec = errorCount.get();
        double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
        double errorRate = (sc + ec) > 0 ? (double) ec / (sc + ec) : 0;

        lastReportTime = now;
        successCount.set(0);
        errorCount.set(0);

        return new QueueMetrics(queueDepth, queueDepth * (avgProcessingTime / 1000),
            avgProcessingTime, errorRate, throughput);
    }

    public void reportMetrics(Channel channel, String queueName) throws Exception {
        QueueMetrics m = getMetrics(channel, queueName);
        System.out.printf("Queue: %s%n", queueName);
        System.out.printf("  Depth: %d messages%n", m.queueDepth());
        System.out.printf("  Lag: %.2f seconds%n", m.consumerLag());
        System.out.printf("  Avg Processing Time: %.2fms%n", m.avgProcessingTime());
        System.out.printf("  Throughput: %.2f msg/s%n", m.throughput());
        System.out.printf("  Error Rate: %.2f%%%n", m.errorRate() * 100);
    }
}
# queue_monitor.py
import time
import asyncio
from collections import deque
from dataclasses import dataclass
import aio_pika


@dataclass
class QueueMetrics:
    queue_depth: int
    consumer_lag: float
    avg_processing_time: float
    error_rate: float
    throughput: float


class QueueMonitor:
    def __init__(self):
        self.processing_times: deque[float] = deque(maxlen=1000)
        self.error_count = 0
        self.success_count = 0
        self.last_report_time = time.monotonic()

    def record_processing_time(self, duration_ms: float) -> None:
        self.processing_times.append(duration_ms)
        self.success_count += 1

    def record_error(self) -> None:
        self.error_count += 1

    async def get_metrics(self, channel: aio_pika.Channel, queue_name: str) -> QueueMetrics:
        queue = await channel.declare_queue(queue_name, passive=True)
        queue_depth = queue.declaration_result.message_count
        now = time.monotonic()
        time_since_last = now - self.last_report_time

        avg_processing_time = (
            sum(self.processing_times) / len(self.processing_times)
            if self.processing_times else 0
        )
        throughput = self.success_count / time_since_last if time_since_last > 0 else 0
        total = self.success_count + self.error_count
        error_rate = self.error_count / total if total > 0 else 0

        self.last_report_time = now
        self.success_count = 0
        self.error_count = 0

        return QueueMetrics(
            queue_depth=queue_depth,
            consumer_lag=queue_depth * (avg_processing_time / 1000),
            avg_processing_time=avg_processing_time,
            error_rate=error_rate,
            throughput=throughput,
        )

    async def report_metrics(self, channel: aio_pika.Channel, queue_name: str) -> None:
        m = await self.get_metrics(channel, queue_name)
        print(f"Queue: {queue_name}")
        print(f"  Depth: {m.queue_depth} messages")
        print(f"  Lag: {m.consumer_lag:.2f} seconds")
        print(f"  Avg Processing Time: {m.avg_processing_time:.2f}ms")
        print(f"  Throughput: {m.throughput:.2f} msg/s")
        print(f"  Error Rate: {m.error_rate * 100:.2f}%")
// QueueMonitor.cs
using RabbitMQ.Client;
using System.Collections.Concurrent;

public record QueueMetrics(
    long QueueDepth, double ConsumerLag, double AvgProcessingTime,
    double ErrorRate, double Throughput);

public class QueueMonitor
{
    private readonly ConcurrentQueue<double> _processingTimes = new();
    private long _errorCount;
    private long _successCount;
    private DateTimeOffset _lastReportTime = DateTimeOffset.UtcNow;

    public void RecordProcessingTime(double durationMs)
    {
        _processingTimes.Enqueue(durationMs);
        Interlocked.Increment(ref _successCount);

        // Keep only last 1000
        while (_processingTimes.Count > 1000)
            _processingTimes.TryDequeue(out _);
    }

    public void RecordError() => Interlocked.Increment(ref _errorCount);

    public QueueMetrics GetMetrics(IModel channel, string queueName)
    {
        var queueInfo = channel.QueueDeclarePassive(queueName);
        var now = DateTimeOffset.UtcNow;
        var timeSinceLastReport = (now - _lastReportTime).TotalSeconds;

        var times = _processingTimes.ToArray();
        double avgProcessingTime = times.Length > 0 ? times.Average() : 0;

        long sc = Interlocked.Exchange(ref _successCount, 0);
        long ec = Interlocked.Exchange(ref _errorCount, 0);

        double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
        double errorRate = (sc + ec) > 0 ? (double)ec / (sc + ec) : 0;

        _lastReportTime = now;

        return new QueueMetrics(
            queueInfo.MessageCount,
            queueInfo.MessageCount * (avgProcessingTime / 1000),
            avgProcessingTime,
            errorRate,
            throughput);
    }

    public void ReportMetrics(IModel channel, string queueName)
    {
        var m = GetMetrics(channel, queueName);
        Console.WriteLine($"Queue: {queueName}");
        Console.WriteLine($"  Depth: {m.QueueDepth} messages");
        Console.WriteLine($"  Lag: {m.ConsumerLag:F2} seconds");
        Console.WriteLine($"  Avg Processing Time: {m.AvgProcessingTime:F2}ms");
        Console.WriteLine($"  Throughput: {m.Throughput:F2} msg/s");
        Console.WriteLine($"  Error Rate: {m.ErrorRate * 100:F2}%");
    }
}

Message Queue Comparison

FeatureRabbitMQAmazon SQSRedis Streams
Delivery GuaranteeAt-most/least/exactly-onceAt-least-onceAt-least-once
Message OrderingPer-queue (FIFO queues)FIFO queues onlyPartial (consumer groups)
RoutingExchanges + bindingsSimple queue modelStream keys
PersistenceDisk + memoryDurable (AWS managed)Memory (snapshots)
ThroughputHighVery highHigh
LatencyLowLow (API call overhead)Very low
Operational ComplexityHigh (self-hosted)Low (managed)Medium
Dead-Letter QueuesBuilt-inBuilt-inManual implementation
MonitoringBuilt-in pluginCloudWatchRedis-native tools
CostInfrastructurePay-per-requestInfrastructure
Best ForComplex routing, high reliabilityScalability, AWS ecosystemHigh throughput, low latency

Production Checklist

  • Connection Pooling: นำกลับมาใช้ connections; สร้างพูลของ channels/connections
  • Error Handling: ใช้ exponential backoff สำหรับความล้มเหลวชั่วคราว
  • Monitoring: ติดตามความลึกของคิว consumer lag อัตราข้อผิดพลาด เวลาการประมวลผล
  • Graceful Shutdown: ระบายข้อความในการบิน ก่อนหยุด consumers
  • Message Serialization: ใช้ schemas ที่มีเวอร์ชัน (Protocol Buffers, Avro) เพื่อความเข้ากันได้แบบ backward
  • Idempotency: ออกแบบ consumers เพื่อจัดการการประมวลผลข้อความซ้ำได้อย่างปลอดภัย
  • Dead-Letter Queues: กำหนดค่า DLQs สำหรับข้อความที่ล้มเหลว ติดตามและเตือนในกรณี DLQ growth
  • Acknowledgment Strategy: ใช้ manual acknowledgment (ไม่ใช่ auto-ack) สำหรับการจัดส่ง at-least-once
  • Timeouts: ตั้งค่า consumer processing timeouts ให้สั้นกว่า visibility timeout
  • Backpressure: ใช้ prefetch limits และติดตามความลึกของคิว
  • Retry Strategy: Exponential backoff with jitter; max retries ก่อน DLQ
  • Security: ใช้ TLS สำหรับการขนส่ง authenticate producers/consumers
  • Testing: การทดสอบอินทิเกรชันด้วย embedded message broker (TestContainers)
  • Documentation: จดหมายเหตุ message schemas, routing rules, SLAs
  • Disaster Recovery: ทดสอบสถานการณ์ failover จดหมายเหตุ recovery procedures
  • Message TTL: ตั้งค่า message expiration เพื่อป้องกันการสะสมคิวไม่มีกำหนด
  • Circuit Breaker: หยุดการบริโภคหากบริการด้านล่างปลายน้ำลงไป ล้มเหลวอย่างรวดเร็ว
  • Capacity Planning: ติดตามแนวโน้มปริมาณงาน วางแผนสำหรับโหลดสูงสุด
  • Alerting: เตือนในกรณี queue depth thresholds ข้อผิดพลาด rate spikes consumer lag growth

Message queues เป็นพื้นฐานของระบบแบบกระจายที่เชื่อถือได้ รูปแบบและการใช้งานที่ครอบคลุมที่นี่จะปรับขนาดจากทีมเล็กไปจนถึงการปรับใช้แบบองค์กร เริ่มต้นอย่างง่าย ติดตามอย่างใกล้ชิด และพัฒนาโครงสร้างพื้นฐาน queue ของคุณเมื่อระบบของคุณขยายออกไป


ไปยังบทความก่อนหน้า: Server-to-Server Communication Technologies

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

เขียนโดย พลากร วรมงคล

Software Engineer Specialist ประสบการณ์กว่า 20 ปี เขียนเกี่ยวกับ Architecture, Performance และการสร้างระบบ Production

เพิ่มเติมเกี่ยวกับผม

บทความที่เกี่ยวข้อง

GraphQL Federation : เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์
13 นาที

GraphQL Federation : เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์

คู่มือที่ครอบคลุมเกี่ยวกับ GraphQL Federation สำหรับการสื่อสารแบบ server-to-server — ครอบคลุมการจัดองค์ประกอบสคีมา การออกแบบ subgraph การแก้ไข entity gateway router การเพิ่มประสิทธิภาพประสิทธิการทำงาน และการปรับใช้สำหรับการใช้งานจริง

อ่านต่อ
การสื่อสาร gRPC : เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์
14 นาที

การสื่อสาร gRPC : เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์

คำแนะนำอย่างครอบคลุมเกี่ยวกับ gRPC สำหรับการสื่อสารแบบเซิร์ฟเวอร์ต่อเซิร์ฟเวอร์ — ครอบคลุม Protocol Buffers, การกำหนดบริการ, รูปแบบการสตรีมมิง, ตัวสกัดกั้น, การจัดการข้อผิดพลาด, การปรับสมดุลโหลด และการปรับใช้เพื่อการผลิต

อ่านต่อ