All articles
Architecture Backend Messaging Microservices

Message Queue Communication Server-to-Server Communication Technologies

Palakorn Voramongkol
April 17, 2025 13 min read

“A comprehensive guide to message queues for server-to-server communication — covering RabbitMQ, Amazon SQS, delivery guarantees, dead-letter queues, backpressure, and production patterns.”

Deep Dive: Message Queue Communication

Message queues are the backbone of modern distributed systems. They decouple services, buffer workloads, and ensure reliable message delivery across your infrastructure. This guide covers the concepts, implementations, and production patterns you need to build robust messaging systems.

What are Message Queues?

Message queues provide asynchronous communication between services. Instead of Service A directly calling Service B (synchronous), Service A places a message in a queue. Service B consumes that message when ready. This creates temporal decoupling—services don’t need to run at the same time—and spatial decoupling—services don’t need to know about each other.

The Point-to-Point Model

In the simplest model, a producer sends messages to a queue, and consumers pull messages from that queue. Each message is processed by exactly one consumer. This differs from pub/sub, where one message goes to many subscribers.

sequenceDiagram
    participant Producer
    participant Queue
    participant Consumer

    Producer->>Queue: Send message ("process_order")
    Queue->>Queue: Store message in buffer
    Note over Queue: Message persisted to disk
    Queue-->>Producer: Acknowledgment
    Consumer->>Queue: Poll for messages
    Queue-->>Consumer: Return message
    Consumer->>Consumer: Process message
    Consumer->>Queue: Send acknowledgment
    Queue->>Queue: Delete message

Delivery Guarantees

Message queue systems offer three levels of delivery guarantees:

At-Most-Once: Messages may be lost but won’t be delivered twice. Fire-and-forget semantics.

At-Least-Once: Messages won’t be lost but may be delivered multiple times. Requires idempotent consumers.

Exactly-Once: Each message is delivered precisely once. Most expensive but strongest guarantee.

Core Principles

  • Decoupling: Producers and consumers operate independently, reducing tight dependencies
  • Buffering: Queues absorb traffic spikes, preventing cascading failures when consumers fall behind
  • Persistence: Messages are stored durably (usually on disk) to survive system failures
  • Ordering: Many systems guarantee FIFO ordering within a single queue or partition
  • Acknowledgment: Consumers explicitly confirm processing; unacknowledged messages are retried
  • Backpressure: Mechanisms to prevent producers from overwhelming slow consumers

RabbitMQ: Exchange-Based Routing

RabbitMQ uses exchanges to route messages to queues based on routing keys and bindings. This decouples message producers from queue topology.

Core Components

Exchange: Receives messages from producers and routes them to bound queues. Three types: Direct (routing key match), Topic (pattern match), Fanout (broadcast).

Queue: Stores messages durably until consumed.

Binding: Maps an exchange to one or more queues with a routing key pattern.

Routing Key: A string producers attach to messages; exchanges use it to decide which queue(s) receive the message.

RabbitMQ Producer Example

import amqp from 'amqplib';

interface OrderMessage {
  orderId: string;
  customerId: string;
  total: number;
  timestamp: Date;
}

class RabbitMQProducer {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string = 'amqp://localhost'): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Declare exchange (idempotent)
    const exchangeName = 'orders';
    await this.channel.assertExchange(exchangeName, 'topic', { durable: true });
  }

  async publishOrder(message: OrderMessage): Promise<boolean> {
    if (!this.channel) {
      throw new Error('Producer not connected');
    }

    const exchangeName = 'orders';
    const routingKey = `orders.${message.customerId}.created`;
    const messageBuffer = Buffer.from(JSON.stringify(message));

    // Publish with confirmation
    return this.channel.publish(
      exchangeName,
      routingKey,
      messageBuffer,
      {
        persistent: true,
        contentType: 'application/json',
        timestamp: Date.now(),
        // Generate unique message ID for idempotency
        messageId: `${message.orderId}-${Date.now()}`,
      }
    );
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Usage
const producer = new RabbitMQProducer();
await producer.connect('amqp://guest:guest@localhost');

await producer.publishOrder({
  orderId: 'ORD-12345',
  customerId: 'CUST-789',
  total: 149.99,
  timestamp: new Date(),
});

await producer.close();
// RabbitMQProducer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class RabbitMQProducer {
    private Connection connection;
    private Channel channel;
    private static final ObjectMapper mapper = new ObjectMapper();

    public record OrderMessage(String orderId, String customerId, double total, Date timestamp) {}

    public void connect(String url) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(url);
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();

        // Declare exchange (idempotent)
        channel.exchangeDeclare("orders", "topic", true);
    }

    public boolean publishOrder(OrderMessage message) throws IOException {
        if (channel == null) throw new IllegalStateException("Producer not connected");

        String exchangeName = "orders";
        String routingKey = "orders." + message.customerId() + ".created";
        byte[] body = mapper.writeValueAsBytes(message);

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .deliveryMode(2) // persistent
            .contentType("application/json")
            .timestamp(new Date())
            .messageId(message.orderId() + "-" + System.currentTimeMillis())
            .build();

        channel.basicPublish(exchangeName, routingKey, props, body);
        return true;
    }

    public void close() throws IOException, TimeoutException {
        if (channel != null) channel.close();
        if (connection != null) connection.close();
    }

    // Usage
    public static void main(String[] args) throws Exception {
        RabbitMQProducer producer = new RabbitMQProducer();
        producer.connect("amqp://guest:guest@localhost");

        producer.publishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, new Date()));

        producer.close();
    }
}
# rabbitmq_producer.py
import aio_pika
import json
from datetime import datetime
from dataclasses import dataclass, asdict


@dataclass
class OrderMessage:
    order_id: str
    customer_id: str
    total: float
    timestamp: str


class RabbitMQProducer:
    def __init__(self):
        self.connection = None
        self.channel = None

    async def connect(self, url: str = "amqp://localhost") -> None:
        self.connection = await aio_pika.connect_robust(url)
        self.channel = await self.connection.channel()

        # Declare exchange (idempotent)
        self.exchange = await self.channel.declare_exchange(
            "orders", aio_pika.ExchangeType.TOPIC, durable=True
        )

    async def publish_order(self, message: OrderMessage) -> None:
        if not self.channel:
            raise RuntimeError("Producer not connected")

        routing_key = f"orders.{message.customer_id}.created"
        body = json.dumps(asdict(message)).encode()

        msg = aio_pika.Message(
            body=body,
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            content_type="application/json",
            message_id=f"{message.order_id}-{int(datetime.now().timestamp() * 1000)}",
        )
        await self.exchange.publish(msg, routing_key=routing_key)

    async def close(self) -> None:
        if self.connection:
            await self.connection.close()


# Usage
import asyncio

async def main():
    producer = RabbitMQProducer()
    await producer.connect("amqp://guest:guest@localhost")

    await producer.publish_order(OrderMessage(
        order_id="ORD-12345",
        customer_id="CUST-789",
        total=149.99,
        timestamp=datetime.now().isoformat(),
    ))

    await producer.close()

asyncio.run(main())
// RabbitMQProducer.cs
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

public record OrderMessage(string OrderId, string CustomerId, double Total, DateTime Timestamp);

public class RabbitMQProducer : IDisposable
{
    private IConnection? _connection;
    private IModel? _channel;

    public void Connect(string url = "amqp://localhost")
    {
        var factory = new ConnectionFactory { Uri = new Uri(url) };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        // Declare exchange (idempotent)
        _channel.ExchangeDeclare("orders", ExchangeType.Topic, durable: true);
    }

    public bool PublishOrder(OrderMessage message)
    {
        if (_channel == null) throw new InvalidOperationException("Producer not connected");

        var routingKey = $"orders.{message.CustomerId}.created";
        var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));

        var props = _channel.CreateBasicProperties();
        props.Persistent = true;
        props.ContentType = "application/json";
        props.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
        props.MessageId = $"{message.OrderId}-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";

        _channel.BasicPublish("orders", routingKey, props, body);
        return true;
    }

    public void Dispose()
    {
        _channel?.Close();
        _connection?.Close();
    }
}

// Usage
using var producer = new RabbitMQProducer();
producer.Connect("amqp://guest:guest@localhost");
producer.PublishOrder(new OrderMessage("ORD-12345", "CUST-789", 149.99, DateTime.UtcNow));

RabbitMQ Consumer with Acknowledgment

import amqp, { Message } from 'amqplib';

interface OrderMessage {
  orderId: string;
  customerId: string;
  total: number;
  timestamp: Date;
}

interface ProcessorConfig {
  exchangeName: string;
  queueName: string;
  routingPatterns: string[];
  prefetchCount: number;
  maxRetries: number;
}

class RabbitMQConsumer {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;
  private config: ProcessorConfig;

  constructor(config: ProcessorConfig) {
    this.config = config;
  }

  async connect(url: string = 'amqp://localhost'): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Set prefetch (QoS): process one message at a time
    await this.channel.prefetch(this.config.prefetchCount);

    // Declare exchange and queue
    await this.channel.assertExchange(this.config.exchangeName, 'topic', {
      durable: true,
    });

    await this.channel.assertQueue(this.config.queueName, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': `${this.config.exchangeName}-dlx`,
      },
    });

    // Bind queue to exchange with routing patterns
    for (const pattern of this.config.routingPatterns) {
      await this.channel.bindQueue(
        this.config.queueName,
        this.config.exchangeName,
        pattern
      );
    }
  }

  async consume(
    handler: (message: OrderMessage) => Promise<void>
  ): Promise<void> {
    if (!this.channel) {
      throw new Error('Consumer not connected');
    }

    await this.channel.consume(
      this.config.queueName,
      async (msg: Message | null) => {
        if (!msg) return;

        try {
          const content = JSON.parse(msg.content.toString()) as OrderMessage;
          const retries = msg.properties.headers['x-death']?.length ?? 0;

          if (retries > this.config.maxRetries) {
            console.error(
              `Message ${msg.properties.messageId} exceeded max retries. Moving to DLQ.`
            );
            // Acknowledge to remove from queue; DLQ will capture it
            this.channel!.ack(msg);
            return;
          }

          // Process with timeout
          await Promise.race([
            handler(content),
            new Promise((_, reject) =>
              setTimeout(
                () => reject(new Error('Processing timeout')),
                30000 // 30 second timeout
              )
            ),
          ]);

          // Acknowledge successful processing
          this.channel!.ack(msg);
          console.log(
            `Processed order ${content.orderId} successfully`
          );
        } catch (error) {
          console.error(
            `Error processing message ${msg.properties.messageId}:`,
            error
          );

          // Negative acknowledgment: requeue for retry
          this.channel!.nack(msg, false, true);
        }
      },
      { noAck: false } // Manual acknowledgment mode
    );

    console.log(`Consumer listening on queue: ${this.config.queueName}`);
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Usage
const consumer = new RabbitMQConsumer({
  exchangeName: 'orders',
  queueName: 'order-processing-queue',
  routingPatterns: ['orders.*.created'],
  prefetchCount: 1,
  maxRetries: 3,
});

await consumer.connect('amqp://guest:guest@localhost');

await consumer.consume(async (order: OrderMessage) => {
  // Simulate processing
  console.log(`Processing order ${order.orderId} for customer ${order.customerId}`);
  
  if (order.total < 0) {
    throw new Error('Invalid order amount');
  }

  // Actual processing logic here
  await new Promise((resolve) => setTimeout(resolve, 1000));
});
// RabbitMQConsumer.java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class RabbitMQConsumer {

    public record ProcessorConfig(
        String exchangeName, String queueName,
        List<String> routingPatterns, int prefetchCount, int maxRetries) {}

    private Connection connection;
    private Channel channel;
    private final ProcessorConfig config;
    private static final ObjectMapper mapper = new ObjectMapper();

    public RabbitMQConsumer(ProcessorConfig config) { this.config = config; }

    public void connect(String url) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(url);
        this.connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.basicQos(config.prefetchCount());

        channel.exchangeDeclare(config.exchangeName(), "topic", true);
        channel.queueDeclare(config.queueName(), true, false, false,
            Map.of("x-dead-letter-exchange", config.exchangeName() + "-dlx"));

        for (String pattern : config.routingPatterns()) {
            channel.queueBind(config.queueName(), config.exchangeName(), pattern);
        }
    }

    public void consume(Consumer<Map<String, Object>> handler) throws IOException {
        if (channel == null) throw new IllegalStateException("Consumer not connected");

        channel.basicConsume(config.queueName(), false, (consumerTag, delivery) -> {
            String messageId = delivery.getProperties().getMessageId();
            try {
                @SuppressWarnings("unchecked")
                Map<String, Object> content = mapper.readValue(
                    new String(delivery.getBody(), StandardCharsets.UTF_8), Map.class);

                // Check retries via x-death header
                Map<String, Object> headers = delivery.getProperties().getHeaders();
                int retries = headers != null && headers.containsKey("x-death")
                    ? ((List<?>) headers.get("x-death")).size() : 0;

                if (retries > config.maxRetries()) {
                    System.err.println("Message " + messageId + " exceeded max retries. Moving to DLQ.");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    return;
                }

                handler.accept(content);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println("Processed order " + content.get("orderId") + " successfully");

            } catch (Exception e) {
                System.err.println("Error processing message " + messageId + ": " + e.getMessage());
                try {
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                } catch (IOException ex) { ex.printStackTrace(); }
            }
        }, consumerTag -> {});

        System.out.println("Consumer listening on queue: " + config.queueName());
    }

    public void close() throws IOException, TimeoutException {
        if (channel != null) channel.close();
        if (connection != null) connection.close();
    }
}
# rabbitmq_consumer.py
import aio_pika
import json
import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable


@dataclass
class ProcessorConfig:
    exchange_name: str
    queue_name: str
    routing_patterns: list[str]
    prefetch_count: int
    max_retries: int


class RabbitMQConsumer:
    def __init__(self, config: ProcessorConfig):
        self.config = config
        self.connection = None
        self.channel = None

    async def connect(self, url: str = "amqp://localhost") -> None:
        self.connection = await aio_pika.connect_robust(url)
        self.channel = await self.connection.channel()

        await self.channel.set_qos(prefetch_count=self.config.prefetch_count)

        exchange = await self.channel.declare_exchange(
            self.config.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
        )

        queue = await self.channel.declare_queue(
            self.config.queue_name,
            durable=True,
            arguments={"x-dead-letter-exchange": f"{self.config.exchange_name}-dlx"},
        )

        for pattern in self.config.routing_patterns:
            await queue.bind(exchange, routing_key=pattern)

        self.queue = queue

    async def consume(self, handler: Callable[[dict], Awaitable[None]]) -> None:
        if not self.channel:
            raise RuntimeError("Consumer not connected")

        async def process_message(message: aio_pika.IncomingMessage):
            message_id = message.message_id
            async with message.process(requeue=True):
                try:
                    content = json.loads(message.body.decode())

                    # Check retries
                    headers = message.headers or {}
                    retries = len(headers.get("x-death", [])) if "x-death" in headers else 0

                    if retries > self.config.max_retries:
                        print(f"Message {message_id} exceeded max retries. Moving to DLQ.")
                        return

                    await asyncio.wait_for(handler(content), timeout=30)
                    print(f"Processed order {content.get('order_id')} successfully")

                except asyncio.TimeoutError:
                    print(f"Processing timeout for message {message_id}")
                    raise
                except Exception as e:
                    print(f"Error processing message {message_id}: {e}")
                    raise

        await self.queue.consume(process_message)
        print(f"Consumer listening on queue: {self.config.queue_name}")

    async def close(self) -> None:
        if self.connection:
            await self.connection.close()


# Usage
async def main():
    consumer = RabbitMQConsumer(ProcessorConfig(
        exchange_name="orders",
        queue_name="order-processing-queue",
        routing_patterns=["orders.*.created"],
        prefetch_count=1,
        max_retries=3,
    ))

    await consumer.connect("amqp://guest:guest@localhost")

    async def handle_order(order: dict):
        print(f"Processing order {order['order_id']} for customer {order['customer_id']}")
        if order["total"] < 0:
            raise ValueError("Invalid order amount")
        await asyncio.sleep(1)

    await consumer.consume(handle_order)
    await asyncio.Future()  # Run forever

asyncio.run(main())
// RabbitMQConsumer.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;

public record ProcessorConfig(
    string ExchangeName, string QueueName,
    string[] RoutingPatterns, int PrefetchCount, int MaxRetries);

public class RabbitMQConsumer : IDisposable
{
    private IConnection? _connection;
    private IModel? _channel;
    private readonly ProcessorConfig _config;

    public RabbitMQConsumer(ProcessorConfig config) { _config = config; }

    public void Connect(string url = "amqp://localhost")
    {
        var factory = new ConnectionFactory { Uri = new Uri(url) };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.BasicQos(0, (ushort)_config.PrefetchCount, false);

        _channel.ExchangeDeclare(_config.ExchangeName, ExchangeType.Topic, durable: true);
        _channel.QueueDeclare(_config.QueueName, durable: true, exclusive: false, autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-dead-letter-exchange"] = $"{_config.ExchangeName}-dlx"
            });

        foreach (var pattern in _config.RoutingPatterns)
            _channel.QueueBind(_config.QueueName, _config.ExchangeName, pattern);
    }

    public void Consume(Func<Dictionary<string, object>, Task> handler)
    {
        if (_channel == null) throw new InvalidOperationException("Consumer not connected");

        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.Received += async (_, ea) =>
        {
            var messageId = ea.BasicProperties.MessageId;
            try
            {
                var content = JsonSerializer.Deserialize<Dictionary<string, object>>(
                    Encoding.UTF8.GetString(ea.Body.ToArray()))!;

                var headers = ea.BasicProperties.Headers;
                int retries = headers != null && headers.TryGetValue("x-death", out var xDeath)
                    ? (xDeath as List<object>)?.Count ?? 0 : 0;

                if (retries > _config.MaxRetries)
                {
                    Console.Error.WriteLine($"Message {messageId} exceeded max retries. Moving to DLQ.");
                    _channel.BasicAck(ea.DeliveryTag, false);
                    return;
                }

                using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
                await handler(content).WaitAsync(cts.Token);

                _channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine($"Processed order {content.GetValueOrDefault("orderId")} successfully");
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"Error processing message {messageId}: {e.Message}");
                _channel.BasicNack(ea.DeliveryTag, false, requeue: true);
            }
        };

        _channel.BasicConsume(_config.QueueName, autoAck: false, consumer: consumer);
        Console.WriteLine($"Consumer listening on queue: {_config.QueueName}");
    }

    public void Dispose()
    {
        _channel?.Close();
        _connection?.Close();
    }
}

Amazon SQS: Managed Queue Service

AWS SQS provides two queue types: Standard and FIFO.

Standard vs FIFO Queues

Standard Queues:

  • Unlimited throughput
  • At-least-once delivery
  • Best-effort FIFO (not guaranteed)
  • Lower latency
  • Use for: event notifications, loosely-ordered work

FIFO Queues:

  • Ordered delivery (strictly FIFO per message group)
  • Exactly-once processing
  • Limited to 300 API calls/second
  • Higher latency due to ordering guarantees
  • Use for: critical workflows, sequential processing

Visibility Timeout and Acknowledgment

When a consumer receives a message from SQS, it becomes invisible to other consumers for the visibility timeout period (default 30 seconds). If the consumer doesn’t delete the message before the timeout expires, SQS makes it visible again (requeue).

import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';

interface OrderPayload {
  orderId: string;
  amount: number;
}

class SQSMessageQueue {
  private client: SQSClient;
  private queueUrl: string;

  constructor(region: string, queueUrl: string) {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async sendMessage(payload: OrderPayload, delaySeconds: number = 0): Promise<string> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(payload),
      DelaySeconds: delaySeconds,
      MessageAttributes: {
        'OrderId': {
          StringValue: payload.orderId,
          DataType: 'String',
        },
        'Timestamp': {
          StringValue: new Date().toISOString(),
          DataType: 'String',
        },
      },
    });

    const response = await this.client.send(command);
    return response.MessageId!;
  }

  async receiveMessages(maxMessages: number = 10, visibilityTimeout: number = 30) {
    const command = new ReceiveMessageCommand({
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: maxMessages,
      VisibilityTimeout: visibilityTimeout,
      WaitTimeSeconds: 20, // Long polling
      MessageAttributeNames: ['All'],
    });

    const response = await this.client.send(command);
    return response.Messages || [];
  }

  async deleteMessage(receiptHandle: string): Promise<void> {
    const command = new DeleteMessageCommand({
      QueueUrl: this.queueUrl,
      ReceiptHandle: receiptHandle,
    });

    await this.client.send(command);
  }
}

// Consumer implementation
class SQSConsumer {
  private queue: SQSMessageQueue;
  private isRunning: boolean = false;

  constructor(queue: SQSMessageQueue) {
    this.queue = queue;
  }

  async start(handler: (payload: OrderPayload) => Promise<void>): Promise<void> {
    this.isRunning = true;

    while (this.isRunning) {
      try {
        const messages = await this.queue.receiveMessages(10, 60);

        if (messages.length === 0) {
          continue;
        }

        for (const message of messages) {
          try {
            const payload = JSON.parse(message.Body!) as OrderPayload;

            // Process with timeout
            await Promise.race([
              handler(payload),
              new Promise((_, reject) =>
                setTimeout(
                  () => reject(new Error('Processing timeout')),
                  50000 // 50 second timeout (less than 60 second visibility)
                )
              ),
            ]);

            // Delete message on success
            await this.queue.deleteMessage(message.ReceiptHandle!);
            console.log(`Processed order ${payload.orderId}`);
          } catch (error) {
            console.error(
              `Error processing message ${message.MessageId}:`,
              error
            );
            // Don't delete; message will reappear after visibility timeout
          }
        }
      } catch (error) {
        console.error('Error in consumer loop:', error);
        await new Promise((resolve) => setTimeout(resolve, 5000));
      }
    }
  }

  stop(): void {
    this.isRunning = false;
  }
}
// SQSMessageQueue.java
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import software.amazon.awssdk.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;
import java.util.Map;

public class SQSMessageQueue {
    private final SqsClient client;
    private final String queueUrl;
    private static final ObjectMapper mapper = new ObjectMapper();

    public record OrderPayload(String orderId, double amount) {}

    public SQSMessageQueue(String region, String queueUrl) {
        this.client = SqsClient.builder().region(Region.of(region)).build();
        this.queueUrl = queueUrl;
    }

    public String sendMessage(OrderPayload payload, int delaySeconds) throws Exception {
        SendMessageResponse response = client.sendMessage(SendMessageRequest.builder()
            .queueUrl(queueUrl)
            .messageBody(mapper.writeValueAsString(payload))
            .delaySeconds(delaySeconds)
            .messageAttributes(Map.of(
                "OrderId", MessageAttributeValue.builder().stringValue(payload.orderId()).dataType("String").build(),
                "Timestamp", MessageAttributeValue.builder().stringValue(java.time.Instant.now().toString()).dataType("String").build()
            ))
            .build());
        return response.messageId();
    }

    public List<Message> receiveMessages(int maxMessages, int visibilityTimeout) {
        ReceiveMessageResponse response = client.receiveMessage(ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(maxMessages)
            .visibilityTimeout(visibilityTimeout)
            .waitTimeSeconds(20) // Long polling
            .messageAttributeNames("All")
            .build());
        return response.messages();
    }

    public void deleteMessage(String receiptHandle) {
        client.deleteMessage(DeleteMessageRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(receiptHandle)
            .build());
    }
}

// SQSConsumer.java
class SQSConsumer {
    private final SQSMessageQueue queue;
    private volatile boolean isRunning = false;

    public SQSConsumer(SQSMessageQueue queue) { this.queue = queue; }

    public void start(java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler) {
        isRunning = true;

        while (isRunning) {
            try {
                List<Message> messages = queue.receiveMessages(10, 60);

                for (Message message : messages) {
                    try {
                        SQSMessageQueue.OrderPayload payload = new ObjectMapper()
                            .readValue(message.body(), SQSMessageQueue.OrderPayload.class);
                        handler.accept(payload);
                        queue.deleteMessage(message.receiptHandle());
                        System.out.println("Processed order " + payload.orderId());
                    } catch (Exception e) {
                        System.err.println("Error processing message " + message.messageId() + ": " + e.getMessage());
                        // Don't delete; message will reappear after visibility timeout
                    }
                }
            } catch (Exception e) {
                System.err.println("Error in consumer loop: " + e.getMessage());
                try { Thread.sleep(5000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
            }
        }
    }

    public void stop() { isRunning = false; }
}
# sqs_message_queue.py
import boto3
import json
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Awaitable


@dataclass
class OrderPayload:
    order_id: str
    amount: float


class SQSMessageQueue:
    def __init__(self, region: str, queue_url: str):
        self.client = boto3.client("sqs", region_name=region)
        self.queue_url = queue_url

    def send_message(self, payload: OrderPayload, delay_seconds: int = 0) -> str:
        response = self.client.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps({"order_id": payload.order_id, "amount": payload.amount}),
            DelaySeconds=delay_seconds,
            MessageAttributes={
                "OrderId": {"StringValue": payload.order_id, "DataType": "String"},
                "Timestamp": {"StringValue": datetime.now(timezone.utc).isoformat(), "DataType": "String"},
            },
        )
        return response["MessageId"]

    def receive_messages(self, max_messages: int = 10, visibility_timeout: int = 30) -> list:
        response = self.client.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            VisibilityTimeout=visibility_timeout,
            WaitTimeSeconds=20,  # Long polling
            MessageAttributeNames=["All"],
        )
        return response.get("Messages", [])

    def delete_message(self, receipt_handle: str) -> None:
        self.client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle)


class SQSConsumer:
    def __init__(self, queue: SQSMessageQueue):
        self.queue = queue
        self.is_running = False

    async def start(self, handler: Callable[[OrderPayload], Awaitable[None]]) -> None:
        self.is_running = True

        while self.is_running:
            try:
                messages = self.queue.receive_messages(10, 60)

                for message in messages:
                    try:
                        data = json.loads(message["Body"])
                        payload = OrderPayload(order_id=data["order_id"], amount=data["amount"])

                        await asyncio.wait_for(handler(payload), timeout=50)

                        self.queue.delete_message(message["ReceiptHandle"])
                        print(f"Processed order {payload.order_id}")
                    except Exception as e:
                        print(f"Error processing message {message['MessageId']}: {e}")
                        # Don't delete; message will reappear after visibility timeout

            except Exception as e:
                print(f"Error in consumer loop: {e}")
                await asyncio.sleep(5)

    def stop(self) -> None:
        self.is_running = False
// SQSMessageQueue.cs
using Amazon.SQS;
using Amazon.SQS.Model;
using System.Text.Json;

public record OrderPayload(string OrderId, double Amount);

public class SQSMessageQueue
{
    private readonly AmazonSQSClient _client;
    private readonly string _queueUrl;

    public SQSMessageQueue(string region, string queueUrl)
    {
        _client = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(region));
        _queueUrl = queueUrl;
    }

    public async Task<string> SendMessageAsync(OrderPayload payload, int delaySeconds = 0)
    {
        var response = await _client.SendMessageAsync(new SendMessageRequest
        {
            QueueUrl = _queueUrl,
            MessageBody = JsonSerializer.Serialize(payload),
            DelaySeconds = delaySeconds,
            MessageAttributes = new Dictionary<string, MessageAttributeValue>
            {
                ["OrderId"] = new() { StringValue = payload.OrderId, DataType = "String" },
                ["Timestamp"] = new() { StringValue = DateTime.UtcNow.ToString("O"), DataType = "String" },
            }
        });
        return response.MessageId;
    }

    public async Task<List<Message>> ReceiveMessagesAsync(int maxMessages = 10, int visibilityTimeout = 30)
    {
        var response = await _client.ReceiveMessageAsync(new ReceiveMessageRequest
        {
            QueueUrl = _queueUrl,
            MaxNumberOfMessages = maxMessages,
            VisibilityTimeout = visibilityTimeout,
            WaitTimeSeconds = 20,
            MessageAttributeNames = new List<string> { "All" },
        });
        return response.Messages;
    }

    public async Task DeleteMessageAsync(string receiptHandle) =>
        await _client.DeleteMessageAsync(_queueUrl, receiptHandle);
}

public class SQSConsumer
{
    private readonly SQSMessageQueue _queue;
    private bool _isRunning;

    public SQSConsumer(SQSMessageQueue queue) { _queue = queue; }

    public async Task StartAsync(Func<OrderPayload, Task> handler)
    {
        _isRunning = true;

        while (_isRunning)
        {
            try
            {
                var messages = await _queue.ReceiveMessagesAsync(10, 60);

                foreach (var message in messages)
                {
                    try
                    {
                        var payload = JsonSerializer.Deserialize<OrderPayload>(message.Body)!;

                        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(50));
                        await handler(payload).WaitAsync(cts.Token);

                        await _queue.DeleteMessageAsync(message.ReceiptHandle);
                        Console.WriteLine($"Processed order {payload.OrderId}");
                    }
                    catch (Exception e)
                    {
                        Console.Error.WriteLine($"Error processing message {message.MessageId}: {e.Message}");
                        // Don't delete; message will reappear after visibility timeout
                    }
                }
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"Error in consumer loop: {e.Message}");
                await Task.Delay(5000);
            }
        }
    }

    public void Stop() => _isRunning = false;
}

Message Patterns

Work Queue Pattern

Multiple consumers process tasks from a single queue in parallel.

// Producer
const queue = new SQSMessageQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789/task-queue');

for (let i = 0; i < 100; i++) {
  await queue.sendMessage({
    orderId: `TASK-${i}`,
    amount: Math.random() * 1000,
  });
}

// Multiple consumers
const consumer = new SQSConsumer(queue);

const handler = async (payload: OrderPayload) => {
  console.log(`Processing ${payload.orderId}`);
  await new Promise((resolve) => setTimeout(resolve, Math.random() * 5000));
};

// Start 5 parallel consumers
Promise.all([...Array(5)].map(() => consumer.start(handler)));
// Work Queue Pattern - Producer
SQSMessageQueue queue = new SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue");

for (int i = 0; i < 100; i++) {
    queue.sendMessage(new SQSMessageQueue.OrderPayload("TASK-" + i, Math.random() * 1000), 0);
}

// Work Queue Pattern - Multiple consumers via thread pool
SQSConsumer consumer = new SQSConsumer(queue);

java.util.function.Consumer<SQSMessageQueue.OrderPayload> handler = payload -> {
    System.out.println("Processing " + payload.orderId());
    try { Thread.sleep((long)(Math.random() * 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
};

// Start 5 parallel consumers
java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    executor.submit(() -> consumer.start(handler));
}
# Work Queue Pattern - Producer
import asyncio
import random

queue = SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue")

for i in range(100):
    queue.send_message(OrderPayload(order_id=f"TASK-{i}", amount=random.random() * 1000))

# Multiple consumers
consumer = SQSConsumer(queue)

async def handler(payload: OrderPayload):
    print(f"Processing {payload.order_id}")
    await asyncio.sleep(random.random() * 5)

# Start 5 parallel consumers
async def main():
    await asyncio.gather(*[consumer.start(handler) for _ in range(5)])

asyncio.run(main())
// Work Queue Pattern - Producer
var queue = new SQSMessageQueue("us-east-1",
    "https://sqs.us-east-1.amazonaws.com/123456789/task-queue");

for (int i = 0; i < 100; i++)
    await queue.SendMessageAsync(new OrderPayload($"TASK-{i}", Random.Shared.NextDouble() * 1000));

// Multiple consumers
var consumer = new SQSConsumer(queue);

Func<OrderPayload, Task> handler = async payload =>
{
    Console.WriteLine($"Processing {payload.OrderId}");
    await Task.Delay((int)(Random.Shared.NextDouble() * 5000));
};

// Start 5 parallel consumers
await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => consumer.StartAsync(handler)));

Pub/Sub Fanout Pattern

One exchange routes messages to multiple queues, each consumed by different services.

// In RabbitMQ, create fanout exchange
async function setupFanout(channel: amqp.Channel) {
  const exchangeName = 'user-events-fanout';
  
  // Fanout: broadcast to all bound queues
  await channel.assertExchange(exchangeName, 'fanout', { durable: true });

  // Email service queue
  await channel.assertQueue('email-queue', { durable: true });
  await channel.bindQueue('email-queue', exchangeName, '');

  // Analytics queue
  await channel.assertQueue('analytics-queue', { durable: true });
  await channel.bindQueue('analytics-queue', exchangeName, '');

  // Notification queue
  await channel.assertQueue('notification-queue', { durable: true });
  await channel.bindQueue('notification-queue', exchangeName, '');

  return exchangeName;
}

// Single user registration message triggers three consumers
const exchangeName = await setupFanout(channel);
await channel.publish(
  exchangeName,
  '', // Fanout ignores routing key
  Buffer.from(JSON.stringify({ userId: 'user-123', email: 'user@example.com' })),
  { persistent: true }
);
// Pub/Sub Fanout Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;

public String setupFanout(Channel channel) throws Exception {
    String exchangeName = "user-events-fanout";

    // Fanout: broadcast to all bound queues
    channel.exchangeDeclare(exchangeName, "fanout", true);

    channel.queueDeclare("email-queue", true, false, false, null);
    channel.queueBind("email-queue", exchangeName, "");

    channel.queueDeclare("analytics-queue", true, false, false, null);
    channel.queueBind("analytics-queue", exchangeName, "");

    channel.queueDeclare("notification-queue", true, false, false, null);
    channel.queueBind("notification-queue", exchangeName, "");

    return exchangeName;
}

// Publish fanout message
String exchangeName = setupFanout(channel);
byte[] body = new ObjectMapper().writeValueAsBytes(
    java.util.Map.of("userId", "user-123", "email", "user@example.com")
);
com.rabbitmq.client.AMQP.BasicProperties props =
    new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish(exchangeName, "", props, body); // Fanout ignores routing key
# Pub/Sub Fanout Pattern
import aio_pika
import json


async def setup_fanout(channel: aio_pika.Channel) -> aio_pika.Exchange:
    exchange_name = "user-events-fanout"

    # Fanout: broadcast to all bound queues
    exchange = await channel.declare_exchange(
        exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
    )

    for queue_name in ("email-queue", "analytics-queue", "notification-queue"):
        queue = await channel.declare_queue(queue_name, durable=True)
        await queue.bind(exchange, routing_key="")  # Fanout ignores routing key

    return exchange


# Publish fanout message
exchange = await setup_fanout(channel)
await exchange.publish(
    aio_pika.Message(
        body=json.dumps({"user_id": "user-123", "email": "user@example.com"}).encode(),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
    ),
    routing_key="",  # Fanout ignores routing key
)
// Pub/Sub Fanout Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

static string SetupFanout(IModel channel)
{
    const string exchangeName = "user-events-fanout";

    // Fanout: broadcast to all bound queues
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, durable: true);

    foreach (var queueName in new[] { "email-queue", "analytics-queue", "notification-queue" })
    {
        channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(queueName, exchangeName, routingKey: ""); // Fanout ignores routing key
    }

    return exchangeName;
}

// Publish fanout message
var exchangeName = SetupFanout(channel);
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { userId = "user-123", email = "user@example.com" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchangeName, routingKey: "", props, body); // Fanout ignores routing key

Topic-Based Routing Pattern

Route messages based on hierarchical routing keys (e.g., orders.processing.urgent, orders.completed.standard).

async function setupTopicRouting(channel: amqp.Channel) {
  const exchangeName = 'orders-topic';
  
  await channel.assertExchange(exchangeName, 'topic', { durable: true });

  // Queue for all order updates
  await channel.assertQueue('all-orders-queue', { durable: true });
  await channel.bindQueue('all-orders-queue', exchangeName, 'orders.#'); // All events

  // Queue for urgent orders only
  await channel.assertQueue('urgent-orders-queue', { durable: true });
  await channel.bindQueue('urgent-orders-queue', exchangeName, 'orders.*.urgent');

  // Queue for processing orders
  await channel.assertQueue('processing-queue', { durable: true });
  await channel.bindQueue('processing-queue', exchangeName, 'orders.processing.*');

  return exchangeName;
}

// Publish with topic routing
await channel.publish(
  'orders-topic',
  'orders.processing.urgent',
  Buffer.from(JSON.stringify({ orderId: '12345', priority: 'urgent' })),
  { persistent: true }
);

// 'urgent-orders-queue' and 'processing-queue' both receive this message
// 'all-orders-queue' also receives it
// Topic-Based Routing Pattern
import com.rabbitmq.client.Channel;
import com.fasterxml.jackson.databind.ObjectMapper;

public String setupTopicRouting(Channel channel) throws Exception {
    String exchangeName = "orders-topic";

    channel.exchangeDeclare(exchangeName, "topic", true);

    channel.queueDeclare("all-orders-queue", true, false, false, null);
    channel.queueBind("all-orders-queue", exchangeName, "orders.#"); // All events

    channel.queueDeclare("urgent-orders-queue", true, false, false, null);
    channel.queueBind("urgent-orders-queue", exchangeName, "orders.*.urgent");

    channel.queueDeclare("processing-queue", true, false, false, null);
    channel.queueBind("processing-queue", exchangeName, "orders.processing.*");

    return exchangeName;
}

// Publish with topic routing
byte[] body = new ObjectMapper().writeValueAsBytes(
    java.util.Map.of("orderId", "12345", "priority", "urgent")
);
com.rabbitmq.client.AMQP.BasicProperties props =
    new com.rabbitmq.client.AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives it
# Topic-Based Routing Pattern
import aio_pika
import json


async def setup_topic_routing(channel: aio_pika.Channel) -> aio_pika.Exchange:
    exchange_name = "orders-topic"

    exchange = await channel.declare_exchange(
        exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
    )

    bindings = [
        ("all-orders-queue", "orders.#"),       # All events
        ("urgent-orders-queue", "orders.*.urgent"),
        ("processing-queue", "orders.processing.*"),
    ]

    for queue_name, routing_key in bindings:
        queue = await channel.declare_queue(queue_name, durable=True)
        await queue.bind(exchange, routing_key=routing_key)

    return exchange


# Publish with topic routing
exchange = await setup_topic_routing(channel)
await exchange.publish(
    aio_pika.Message(
        body=json.dumps({"order_id": "12345", "priority": "urgent"}).encode(),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
    ),
    routing_key="orders.processing.urgent",
)
# 'urgent-orders-queue' and 'processing-queue' both receive this
# 'all-orders-queue' also receives it
// Topic-Based Routing Pattern
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

static string SetupTopicRouting(IModel channel)
{
    const string exchangeName = "orders-topic";

    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, durable: true);

    var bindings = new[]
    {
        ("all-orders-queue",      "orders.#"),           // All events
        ("urgent-orders-queue",   "orders.*.urgent"),
        ("processing-queue",      "orders.processing.*"),
    };

    foreach (var (queueName, routingKey) in bindings)
    {
        channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(queueName, exchangeName, routingKey);
    }

    return exchangeName;
}

// Publish with topic routing
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { orderId = "12345", priority = "urgent" }));
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish("orders-topic", "orders.processing.urgent", props, body);
// 'urgent-orders-queue' and 'processing-queue' both receive this
// 'all-orders-queue' also receives it

Dead-Letter Queues and Retry Strategies

Dead-letter queues (DLQs) capture messages that fail processing after max retries.

interface RetryConfig {
  maxRetries: number;
  initialDelayMs: number;
  maxDelayMs: number;
  backoffMultiplier: number;
}

class MessageProcessor {
  private config: RetryConfig;

  constructor(config: RetryConfig) {
    this.config = config;
  }

  async setupRabbitMQDLQ(channel: amqp.Channel): Promise<void> {
    const mainExchange = 'orders';
    const dlxExchange = `${mainExchange}-dlx`;
    const dlqName = 'orders-dlq';

    // Dead-letter exchange
    await channel.assertExchange(dlxExchange, 'direct', { durable: true });

    // Dead-letter queue
    await channel.assertQueue(dlqName, { durable: true });
    await channel.bindQueue(dlqName, dlxExchange, 'orders');

    // Main queue with DLX configured
    await channel.assertQueue('orders-queue', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': dlxExchange,
        'x-dead-letter-routing-key': 'orders',
        'x-max-length': 1000000, // Prevent unbounded growth
      },
    });
  }

  calculateBackoffDelay(retryCount: number): number {
    const exponentialDelay = this.config.initialDelayMs * 
      Math.pow(this.config.backoffMultiplier, retryCount);
    
    return Math.min(exponentialDelay, this.config.maxDelayMs);
  }

  async processWithRetry<T>(
    handler: (data: T) => Promise<void>,
    data: T,
    retryCount: number = 0
  ): Promise<void> {
    try {
      await handler(data);
    } catch (error) {
      if (retryCount < this.config.maxRetries) {
        const delayMs = this.calculateBackoffDelay(retryCount);
        console.log(
          `Retry ${retryCount + 1}/${this.config.maxRetries} after ${delayMs}ms`
        );
        
        await new Promise((resolve) => setTimeout(resolve, delayMs));
        await this.processWithRetry(handler, data, retryCount + 1);
      } else {
        console.error(
          `Failed after ${this.config.maxRetries} retries. Moving to DLQ.`
        );
        throw error;
      }
    }
  }
}

// Usage
const processor = new MessageProcessor({
  maxRetries: 3,
  initialDelayMs: 1000,
  maxDelayMs: 30000,
  backoffMultiplier: 2,
});

await processor.processWithRetry(
  async (order: OrderMessage) => {
    // Process logic
    if (Math.random() < 0.3) throw new Error('Temporary failure');
    console.log('Order processed:', order.orderId);
  },
  { orderId: 'ORD-123', customerId: 'CUST-456', total: 99.99, timestamp: new Date() }
);
// MessageProcessor.java
import com.rabbitmq.client.Channel;
import java.util.Map;
import java.util.function.Consumer;

public class MessageProcessor {

    public record RetryConfig(int maxRetries, long initialDelayMs, long maxDelayMs, double backoffMultiplier) {}

    private final RetryConfig config;

    public MessageProcessor(RetryConfig config) { this.config = config; }

    public void setupRabbitMQDLQ(Channel channel) throws Exception {
        String mainExchange = "orders";
        String dlxExchange = mainExchange + "-dlx";
        String dlqName = "orders-dlq";

        channel.exchangeDeclare(dlxExchange, "direct", true);
        channel.queueDeclare(dlqName, true, false, false, null);
        channel.queueBind(dlqName, dlxExchange, "orders");

        channel.queueDeclare("orders-queue", true, false, false, Map.of(
            "x-dead-letter-exchange", dlxExchange,
            "x-dead-letter-routing-key", "orders",
            "x-max-length", 1_000_000
        ));
    }

    public long calculateBackoffDelay(int retryCount) {
        long exponentialDelay = (long)(config.initialDelayMs() *
            Math.pow(config.backoffMultiplier(), retryCount));
        return Math.min(exponentialDelay, config.maxDelayMs());
    }

    public <T> void processWithRetry(Consumer<T> handler, T data, int retryCount) throws Exception {
        try {
            handler.accept(data);
        } catch (Exception e) {
            if (retryCount < config.maxRetries()) {
                long delayMs = calculateBackoffDelay(retryCount);
                System.out.printf("Retry %d/%d after %dms%n", retryCount + 1, config.maxRetries(), delayMs);
                Thread.sleep(delayMs);
                processWithRetry(handler, data, retryCount + 1);
            } else {
                System.err.printf("Failed after %d retries. Moving to DLQ.%n", config.maxRetries());
                throw e;
            }
        }
    }

    // Usage
    public static void main(String[] args) throws Exception {
        MessageProcessor processor = new MessageProcessor(
            new RetryConfig(3, 1000, 30000, 2.0)
        );

        processor.processWithRetry(order -> {
            if (Math.random() < 0.3) throw new RuntimeException("Temporary failure");
            System.out.println("Order processed: " + order);
        }, "ORD-123", 0);
    }
}
# message_processor.py
import asyncio
import random
from dataclasses import dataclass
from typing import Callable, TypeVar, Awaitable

import aio_pika

T = TypeVar("T")


@dataclass
class RetryConfig:
    max_retries: int
    initial_delay_ms: int
    max_delay_ms: int
    backoff_multiplier: float


class MessageProcessor:
    def __init__(self, config: RetryConfig):
        self.config = config

    async def setup_rabbitmq_dlq(self, channel: aio_pika.Channel) -> None:
        main_exchange = "orders"
        dlx_exchange = f"{main_exchange}-dlx"
        dlq_name = "orders-dlq"

        dead_exchange = await channel.declare_exchange(dlx_exchange, aio_pika.ExchangeType.DIRECT, durable=True)
        dlq = await channel.declare_queue(dlq_name, durable=True)
        await dlq.bind(dead_exchange, routing_key="orders")

        await channel.declare_queue("orders-queue", durable=True, arguments={
            "x-dead-letter-exchange": dlx_exchange,
            "x-dead-letter-routing-key": "orders",
            "x-max-length": 1_000_000,
        })

    def calculate_backoff_delay(self, retry_count: int) -> float:
        delay = self.config.initial_delay_ms * (self.config.backoff_multiplier ** retry_count)
        return min(delay, self.config.max_delay_ms) / 1000  # return seconds

    async def process_with_retry(
        self,
        handler: Callable[[T], Awaitable[None]],
        data: T,
        retry_count: int = 0,
    ) -> None:
        try:
            await handler(data)
        except Exception as e:
            if retry_count < self.config.max_retries:
                delay = self.calculate_backoff_delay(retry_count)
                print(f"Retry {retry_count + 1}/{self.config.max_retries} after {delay*1000:.0f}ms")
                await asyncio.sleep(delay)
                await self.process_with_retry(handler, data, retry_count + 1)
            else:
                print(f"Failed after {self.config.max_retries} retries. Moving to DLQ.")
                raise


# Usage
async def main():
    processor = MessageProcessor(RetryConfig(
        max_retries=3, initial_delay_ms=1000, max_delay_ms=30000, backoff_multiplier=2.0
    ))

    async def handle_order(order: dict):
        if random.random() < 0.3:
            raise ValueError("Temporary failure")
        print(f"Order processed: {order['order_id']}")

    await processor.process_with_retry(
        handle_order,
        {"order_id": "ORD-123", "customer_id": "CUST-456", "total": 99.99}
    )

asyncio.run(main())
// MessageProcessor.cs
using RabbitMQ.Client;

public record RetryConfig(int MaxRetries, int InitialDelayMs, int MaxDelayMs, double BackoffMultiplier);

public class MessageProcessor
{
    private readonly RetryConfig _config;

    public MessageProcessor(RetryConfig config) { _config = config; }

    public void SetupRabbitMQDLQ(IModel channel)
    {
        const string mainExchange = "orders";
        var dlxExchange = $"{mainExchange}-dlx";
        const string dlqName = "orders-dlq";

        channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, durable: true);
        channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(dlqName, dlxExchange, "orders");

        channel.QueueDeclare("orders-queue", durable: true, exclusive: false, autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-dead-letter-exchange"] = dlxExchange,
                ["x-dead-letter-routing-key"] = "orders",
                ["x-max-length"] = 1_000_000,
            });
    }

    public int CalculateBackoffDelay(int retryCount)
    {
        var exponentialDelay = (int)(_config.InitialDelayMs * Math.Pow(_config.BackoffMultiplier, retryCount));
        return Math.Min(exponentialDelay, _config.MaxDelayMs);
    }

    public async Task ProcessWithRetryAsync<T>(Func<T, Task> handler, T data, int retryCount = 0)
    {
        try
        {
            await handler(data);
        }
        catch (Exception) when (retryCount < _config.MaxRetries)
        {
            int delayMs = CalculateBackoffDelay(retryCount);
            Console.WriteLine($"Retry {retryCount + 1}/{_config.MaxRetries} after {delayMs}ms");
            await Task.Delay(delayMs);
            await ProcessWithRetryAsync(handler, data, retryCount + 1);
        }
        catch
        {
            Console.Error.WriteLine($"Failed after {_config.MaxRetries} retries. Moving to DLQ.");
            throw;
        }
    }
}

// Usage
var processor = new MessageProcessor(new RetryConfig(3, 1000, 30000, 2.0));

await processor.ProcessWithRetryAsync(async (order) =>
{
    if (Random.Shared.NextDouble() < 0.3) throw new Exception("Temporary failure");
    Console.WriteLine($"Order processed: {order}");
}, new { orderId = "ORD-123", customerId = "CUST-456", total = 99.99 });

Idempotency: Handling Duplicate Messages

Idempotent consumers can safely process the same message multiple times without adverse effects.

interface IdempotencyStore {
  isProcessed(messageId: string): Promise<boolean>;
  markProcessed(messageId: string, expiryMs: number): Promise<void>;
}

class RedisIdempotencyStore implements IdempotencyStore {
  constructor(private redisClient: any) {}

  async isProcessed(messageId: string): Promise<boolean> {
    const key = `processed:${messageId}`;
    const result = await this.redisClient.get(key);
    return result !== null;
  }

  async markProcessed(messageId: string, expiryMs: number = 86400000): Promise<void> {
    const key = `processed:${messageId}`;
    await this.redisClient.setex(key, Math.ceil(expiryMs / 1000), '1');
  }
}

class IdempotentConsumer {
  constructor(
    private queue: RabbitMQConsumer,
    private idempotencyStore: IdempotencyStore
  ) {}

  async consume(
    handler: (message: OrderMessage) => Promise<void>
  ): Promise<void> {
    await this.queue.consume(async (message: OrderMessage) => {
      // Check idempotency
      const alreadyProcessed = await this.idempotencyStore.isProcessed(
        message.orderId
      );

      if (alreadyProcessed) {
        console.log(`Message ${message.orderId} already processed. Skipping.`);
        return; // Silently skip
      }

      // Process
      await handler(message);

      // Mark as processed
      await this.idempotencyStore.markProcessed(message.orderId);
    });
  }
}
// IdempotencyStore.java
import redis.clients.jedis.Jedis;

public interface IdempotencyStore {
    boolean isProcessed(String messageId);
    void markProcessed(String messageId, long expiryMs);
}

public class RedisIdempotencyStore implements IdempotencyStore {
    private final Jedis redis;

    public RedisIdempotencyStore(Jedis redis) { this.redis = redis; }

    @Override
    public boolean isProcessed(String messageId) {
        return redis.get("processed:" + messageId) != null;
    }

    @Override
    public void markProcessed(String messageId, long expiryMs) {
        redis.setex("processed:" + messageId, (int) Math.ceil(expiryMs / 1000.0), "1");
    }
}

public class IdempotentConsumer {
    private final RabbitMQConsumer queue;
    private final IdempotencyStore idempotencyStore;

    public IdempotentConsumer(RabbitMQConsumer queue, IdempotencyStore store) {
        this.queue = queue;
        this.idempotencyStore = store;
    }

    public void consume(java.util.function.Consumer<java.util.Map<String, Object>> handler) throws Exception {
        queue.consume(message -> {
            String orderId = (String) message.get("orderId");

            if (idempotencyStore.isProcessed(orderId)) {
                System.out.println("Message " + orderId + " already processed. Skipping.");
                return;
            }

            handler.accept(message);
            idempotencyStore.markProcessed(orderId, 86_400_000L);
        });
    }
}
# idempotency.py
import redis.asyncio as aioredis
from typing import Protocol, runtime_checkable


@runtime_checkable
class IdempotencyStore(Protocol):
    async def is_processed(self, message_id: str) -> bool: ...
    async def mark_processed(self, message_id: str, expiry_ms: int) -> None: ...


class RedisIdempotencyStore:
    def __init__(self, redis_client: aioredis.Redis):
        self.redis = redis_client

    async def is_processed(self, message_id: str) -> bool:
        key = f"processed:{message_id}"
        return await self.redis.get(key) is not None

    async def mark_processed(self, message_id: str, expiry_ms: int = 86_400_000) -> None:
        key = f"processed:{message_id}"
        await self.redis.setex(key, expiry_ms // 1000, "1")


class IdempotentConsumer:
    def __init__(self, queue: RabbitMQConsumer, store: IdempotencyStore):
        self.queue = queue
        self.store = store

    async def consume(self, handler):
        async def idempotent_handler(message: dict):
            order_id = message.get("order_id") or message.get("orderId")

            if await self.store.is_processed(order_id):
                print(f"Message {order_id} already processed. Skipping.")
                return

            await handler(message)
            await self.store.mark_processed(order_id)

        await self.queue.consume(idempotent_handler)
// Idempotency.cs
using StackExchange.Redis;

public interface IIdempotencyStore
{
    Task<bool> IsProcessedAsync(string messageId);
    Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null);
}

public class RedisIdempotencyStore : IIdempotencyStore
{
    private readonly IDatabase _redis;

    public RedisIdempotencyStore(IConnectionMultiplexer multiplexer)
    {
        _redis = multiplexer.GetDatabase();
    }

    public async Task<bool> IsProcessedAsync(string messageId)
    {
        return await _redis.KeyExistsAsync($"processed:{messageId}");
    }

    public async Task MarkProcessedAsync(string messageId, TimeSpan? expiry = null)
    {
        await _redis.StringSetAsync(
            $"processed:{messageId}", "1",
            expiry ?? TimeSpan.FromDays(1));
    }
}

public class IdempotentConsumer
{
    private readonly RabbitMQConsumer _queue;
    private readonly IIdempotencyStore _store;

    public IdempotentConsumer(RabbitMQConsumer queue, IIdempotencyStore store)
    {
        _queue = queue;
        _store = store;
    }

    public void Consume(Func<Dictionary<string, object>, Task> handler)
    {
        _queue.Consume(async message =>
        {
            var orderId = message.GetValueOrDefault("orderId")?.ToString() ?? string.Empty;

            if (await _store.IsProcessedAsync(orderId))
            {
                Console.WriteLine($"Message {orderId} already processed. Skipping.");
                return;
            }

            await handler(message);
            await _store.MarkProcessedAsync(orderId);
        });
    }
}

Backpressure and Rate Control

Prevent producers from overwhelming consumers.

interface BackpressureConfig {
  prefetchCount: number;
  highWaterMark: number;
  lowWaterMark: number;
}

class BackpressureController {
  private isPaused: boolean = false;

  async handleBackpressure(
    channel: amqp.Channel,
    queueName: string,
    config: BackpressureConfig
  ): Promise<void> {
    // Set prefetch (QoS) to limit in-flight messages
    await channel.prefetch(config.prefetchCount);

    // Monitor queue depth
    const checkQueueDepth = async () => {
      const queueInfo = await channel.checkQueue(queueName);
      const depth = queueInfo.messageCount;

      if (depth > config.highWaterMark && !this.isPaused) {
        console.log(
          `High backpressure: queue depth ${depth} > ${config.highWaterMark}`
        );
        this.isPaused = true;
        // Pause processing or reduce producer throughput
      } else if (depth < config.lowWaterMark && this.isPaused) {
        console.log(
          `Backpressure relieved: queue depth ${depth} < ${config.lowWaterMark}`
        );
        this.isPaused = false;
        // Resume processing
      }
    };

    setInterval(checkQueueDepth, 5000);
  }
}

// Rate limiting producer
class RateLimitedProducer {
  private tokens: number;
  private lastRefillTime: number;

  constructor(
    private tokensPerSecond: number,
    private maxBurst: number
  ) {
    this.tokens = maxBurst;
    this.lastRefillTime = Date.now();
  }

  async acquire(): Promise<void> {
    const now = Date.now();
    const timePassed = (now - this.lastRefillTime) / 1000;
    
    this.tokens = Math.min(
      this.maxBurst,
      this.tokens + timePassed * this.tokensPerSecond
    );
    this.lastRefillTime = now;

    if (this.tokens < 1) {
      const waitTime = (1 - this.tokens) / this.tokensPerSecond * 1000;
      await new Promise((resolve) => setTimeout(resolve, waitTime));
      return this.acquire();
    }

    this.tokens -= 1;
  }

  async publishWithRateLimit(
    publisher: (msg: any) => Promise<void>,
    message: any
  ): Promise<void> {
    await this.acquire();
    await publisher(message);
  }
}
// BackpressureController.java
import com.rabbitmq.client.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class BackpressureController {
    private final AtomicBoolean isPaused = new AtomicBoolean(false);

    public void handleBackpressure(Channel channel, String queueName,
                                    int prefetchCount, int highWaterMark, int lowWaterMark) throws Exception {
        channel.basicQos(prefetchCount);

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {
            try {
                long depth = channel.messageCount(queueName);
                if (depth > highWaterMark && !isPaused.get()) {
                    System.out.printf("High backpressure: queue depth %d > %d%n", depth, highWaterMark);
                    isPaused.set(true);
                } else if (depth < lowWaterMark && isPaused.get()) {
                    System.out.printf("Backpressure relieved: queue depth %d < %d%n", depth, lowWaterMark);
                    isPaused.set(false);
                }
            } catch (Exception e) { e.printStackTrace(); }
        }, 0, 5, TimeUnit.SECONDS);
    }
}

// Token bucket rate limiter
public class RateLimitedProducer {
    private final double tokensPerSecond;
    private final double maxBurst;
    private double tokens;
    private long lastRefillTime;

    public RateLimitedProducer(double tokensPerSecond, double maxBurst) {
        this.tokensPerSecond = tokensPerSecond;
        this.maxBurst = maxBurst;
        this.tokens = maxBurst;
        this.lastRefillTime = System.currentTimeMillis();
    }

    public synchronized void acquire() throws InterruptedException {
        long now = System.currentTimeMillis();
        double timePassed = (now - lastRefillTime) / 1000.0;
        tokens = Math.min(maxBurst, tokens + timePassed * tokensPerSecond);
        lastRefillTime = now;

        if (tokens < 1) {
            long waitTime = (long)((1 - tokens) / tokensPerSecond * 1000);
            Thread.sleep(waitTime);
            acquire();
            return;
        }
        tokens -= 1;
    }

    public void publishWithRateLimit(java.util.function.Consumer<Object> publisher, Object message)
            throws InterruptedException {
        acquire();
        publisher.accept(message);
    }
}
# backpressure.py
import asyncio
import time
import aio_pika
from dataclasses import dataclass


@dataclass
class BackpressureConfig:
    prefetch_count: int
    high_water_mark: int
    low_water_mark: int


class BackpressureController:
    def __init__(self):
        self._is_paused = False

    async def handle_backpressure(
        self, channel: aio_pika.Channel, queue_name: str, config: BackpressureConfig
    ) -> None:
        await channel.set_qos(prefetch_count=config.prefetch_count)

        async def check_queue_depth():
            while True:
                await asyncio.sleep(5)
                try:
                    queue = await channel.get_queue(queue_name, ensure=False)
                    depth = queue.declaration_result.message_count

                    if depth > config.high_water_mark and not self._is_paused:
                        print(f"High backpressure: queue depth {depth} > {config.high_water_mark}")
                        self._is_paused = True
                    elif depth < config.low_water_mark and self._is_paused:
                        print(f"Backpressure relieved: queue depth {depth} < {config.low_water_mark}")
                        self._is_paused = False
                except Exception as e:
                    print(f"Error checking queue depth: {e}")

        asyncio.create_task(check_queue_depth())


class RateLimitedProducer:
    def __init__(self, tokens_per_second: float, max_burst: float):
        self.tokens_per_second = tokens_per_second
        self.max_burst = max_burst
        self.tokens = max_burst
        self.last_refill_time = time.monotonic()

    async def acquire(self) -> None:
        now = time.monotonic()
        time_passed = now - self.last_refill_time
        self.tokens = min(self.max_burst, self.tokens + time_passed * self.tokens_per_second)
        self.last_refill_time = now

        if self.tokens < 1:
            wait_time = (1 - self.tokens) / self.tokens_per_second
            await asyncio.sleep(wait_time)
            return await self.acquire()

        self.tokens -= 1

    async def publish_with_rate_limit(self, publisher, message) -> None:
        await self.acquire()
        await publisher(message)
// BackpressureController.cs
using RabbitMQ.Client;

public record BackpressureConfig(int PrefetchCount, int HighWaterMark, int LowWaterMark);

public class BackpressureController
{
    private bool _isPaused;

    public async Task HandleBackpressureAsync(IModel channel, string queueName, BackpressureConfig config)
    {
        channel.BasicQos(0, (ushort)config.PrefetchCount, false);

        _ = Task.Run(async () =>
        {
            while (true)
            {
                await Task.Delay(5000);
                try
                {
                    var queueInfo = channel.QueueDeclarePassive(queueName);
                    var depth = queueInfo.MessageCount;

                    if (depth > config.HighWaterMark && !_isPaused)
                    {
                        Console.WriteLine($"High backpressure: queue depth {depth} > {config.HighWaterMark}");
                        _isPaused = true;
                    }
                    else if (depth < config.LowWaterMark && _isPaused)
                    {
                        Console.WriteLine($"Backpressure relieved: queue depth {depth} < {config.LowWaterMark}");
                        _isPaused = false;
                    }
                }
                catch (Exception e) { Console.Error.WriteLine($"Error checking queue: {e.Message}"); }
            }
        });
    }
}

public class RateLimitedProducer
{
    private readonly double _tokensPerSecond;
    private readonly double _maxBurst;
    private double _tokens;
    private DateTimeOffset _lastRefillTime;
    private readonly SemaphoreSlim _lock = new(1, 1);

    public RateLimitedProducer(double tokensPerSecond, double maxBurst)
    {
        _tokensPerSecond = tokensPerSecond;
        _maxBurst = maxBurst;
        _tokens = maxBurst;
        _lastRefillTime = DateTimeOffset.UtcNow;
    }

    public async Task AcquireAsync()
    {
        await _lock.WaitAsync();
        try
        {
            var now = DateTimeOffset.UtcNow;
            var timePassed = (now - _lastRefillTime).TotalSeconds;
            _tokens = Math.Min(_maxBurst, _tokens + timePassed * _tokensPerSecond);
            _lastRefillTime = now;

            if (_tokens < 1)
            {
                var waitMs = (int)((1 - _tokens) / _tokensPerSecond * 1000);
                _lock.Release();
                await Task.Delay(waitMs);
                await AcquireAsync();
                return;
            }
            _tokens -= 1;
        }
        finally { if (_lock.CurrentCount == 0) _lock.Release(); }
    }

    public async Task PublishWithRateLimitAsync(Func<object, Task> publisher, object message)
    {
        await AcquireAsync();
        await publisher(message);
    }
}

Monitoring: Queue Depth, Consumer Lag, and Processing Time

interface QueueMetrics {
  queueDepth: number;
  consumerLag: number;
  avgProcessingTime: number;
  errorRate: number;
  throughput: number; // messages/second
}

class QueueMonitor {
  private metrics: Map<string, QueueMetrics> = new Map();
  private processingTimes: number[] = [];
  private errorCount: number = 0;
  private successCount: number = 0;
  private lastReportTime: number = Date.now();

  recordProcessingTime(durationMs: number): void {
    this.processingTimes.push(durationMs);
    this.successCount++;

    // Keep only last 1000 measurements
    if (this.processingTimes.length > 1000) {
      this.processingTimes.shift();
    }
  }

  recordError(): void {
    this.errorCount++;
  }

  async getMetrics(
    channel: amqp.Channel,
    queueName: string
  ): Promise<QueueMetrics> {
    const queueInfo = await channel.checkQueue(queueName);
    const now = Date.now();
    const timeSinceLastReport = (now - this.lastReportTime) / 1000;

    const avgProcessingTime =
      this.processingTimes.length > 0
        ? this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length
        : 0;

    const throughput =
      timeSinceLastReport > 0
        ? this.successCount / timeSinceLastReport
        : 0;

    const errorRate =
      this.successCount + this.errorCount > 0
        ? this.errorCount / (this.successCount + this.errorCount)
        : 0;

    const metrics: QueueMetrics = {
      queueDepth: queueInfo.messageCount,
      consumerLag: queueInfo.messageCount * (avgProcessingTime / 1000),
      avgProcessingTime,
      errorRate,
      throughput,
    };

    // Reset counters for next period
    this.lastReportTime = now;
    this.successCount = 0;
    this.errorCount = 0;

    return metrics;
  }

  async reportMetrics(
    channel: amqp.Channel,
    queueName: string
  ): Promise<void> {
    const metrics = await this.getMetrics(channel, queueName);

    console.log(`Queue: ${queueName}`);
    console.log(`  Depth: ${metrics.queueDepth} messages`);
    console.log(`  Lag: ${metrics.consumerLag.toFixed(2)} seconds`);
    console.log(`  Avg Processing Time: ${metrics.avgProcessingTime.toFixed(2)}ms`);
    console.log(`  Throughput: ${metrics.throughput.toFixed(2)} msg/s`);
    console.log(`  Error Rate: ${(metrics.errorRate * 100).toFixed(2)}%`);
  }
}
// QueueMonitor.java
import com.rabbitmq.client.Channel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLong;

public class QueueMonitor {

    public record QueueMetrics(long queueDepth, double consumerLag, double avgProcessingTime,
                                double errorRate, double throughput) {}

    private final Deque<Long> processingTimes = new ArrayDeque<>();
    private final AtomicLong errorCount = new AtomicLong(0);
    private final AtomicLong successCount = new AtomicLong(0);
    private long lastReportTime = System.currentTimeMillis();

    public void recordProcessingTime(long durationMs) {
        processingTimes.addLast(durationMs);
        successCount.incrementAndGet();
        while (processingTimes.size() > 1000) processingTimes.pollFirst();
    }

    public void recordError() { errorCount.incrementAndGet(); }

    public QueueMetrics getMetrics(Channel channel, String queueName) throws Exception {
        long queueDepth = channel.messageCount(queueName);
        long now = System.currentTimeMillis();
        double timeSinceLastReport = (now - lastReportTime) / 1000.0;

        double avgProcessingTime = processingTimes.isEmpty() ? 0
            : processingTimes.stream().mapToLong(Long::longValue).average().orElse(0);

        long sc = successCount.get(), ec = errorCount.get();
        double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
        double errorRate = (sc + ec) > 0 ? (double) ec / (sc + ec) : 0;

        lastReportTime = now;
        successCount.set(0);
        errorCount.set(0);

        return new QueueMetrics(queueDepth, queueDepth * (avgProcessingTime / 1000),
            avgProcessingTime, errorRate, throughput);
    }

    public void reportMetrics(Channel channel, String queueName) throws Exception {
        QueueMetrics m = getMetrics(channel, queueName);
        System.out.printf("Queue: %s%n", queueName);
        System.out.printf("  Depth: %d messages%n", m.queueDepth());
        System.out.printf("  Lag: %.2f seconds%n", m.consumerLag());
        System.out.printf("  Avg Processing Time: %.2fms%n", m.avgProcessingTime());
        System.out.printf("  Throughput: %.2f msg/s%n", m.throughput());
        System.out.printf("  Error Rate: %.2f%%%n", m.errorRate() * 100);
    }
}
# queue_monitor.py
import time
import asyncio
from collections import deque
from dataclasses import dataclass
import aio_pika


@dataclass
class QueueMetrics:
    queue_depth: int
    consumer_lag: float
    avg_processing_time: float
    error_rate: float
    throughput: float


class QueueMonitor:
    def __init__(self):
        self.processing_times: deque[float] = deque(maxlen=1000)
        self.error_count = 0
        self.success_count = 0
        self.last_report_time = time.monotonic()

    def record_processing_time(self, duration_ms: float) -> None:
        self.processing_times.append(duration_ms)
        self.success_count += 1

    def record_error(self) -> None:
        self.error_count += 1

    async def get_metrics(self, channel: aio_pika.Channel, queue_name: str) -> QueueMetrics:
        queue = await channel.declare_queue(queue_name, passive=True)
        queue_depth = queue.declaration_result.message_count
        now = time.monotonic()
        time_since_last = now - self.last_report_time

        avg_processing_time = (
            sum(self.processing_times) / len(self.processing_times)
            if self.processing_times else 0
        )
        throughput = self.success_count / time_since_last if time_since_last > 0 else 0
        total = self.success_count + self.error_count
        error_rate = self.error_count / total if total > 0 else 0

        self.last_report_time = now
        self.success_count = 0
        self.error_count = 0

        return QueueMetrics(
            queue_depth=queue_depth,
            consumer_lag=queue_depth * (avg_processing_time / 1000),
            avg_processing_time=avg_processing_time,
            error_rate=error_rate,
            throughput=throughput,
        )

    async def report_metrics(self, channel: aio_pika.Channel, queue_name: str) -> None:
        m = await self.get_metrics(channel, queue_name)
        print(f"Queue: {queue_name}")
        print(f"  Depth: {m.queue_depth} messages")
        print(f"  Lag: {m.consumer_lag:.2f} seconds")
        print(f"  Avg Processing Time: {m.avg_processing_time:.2f}ms")
        print(f"  Throughput: {m.throughput:.2f} msg/s")
        print(f"  Error Rate: {m.error_rate * 100:.2f}%")
// QueueMonitor.cs
using RabbitMQ.Client;
using System.Collections.Concurrent;

public record QueueMetrics(
    long QueueDepth, double ConsumerLag, double AvgProcessingTime,
    double ErrorRate, double Throughput);

public class QueueMonitor
{
    private readonly ConcurrentQueue<double> _processingTimes = new();
    private long _errorCount;
    private long _successCount;
    private DateTimeOffset _lastReportTime = DateTimeOffset.UtcNow;

    public void RecordProcessingTime(double durationMs)
    {
        _processingTimes.Enqueue(durationMs);
        Interlocked.Increment(ref _successCount);

        // Keep only last 1000
        while (_processingTimes.Count > 1000)
            _processingTimes.TryDequeue(out _);
    }

    public void RecordError() => Interlocked.Increment(ref _errorCount);

    public QueueMetrics GetMetrics(IModel channel, string queueName)
    {
        var queueInfo = channel.QueueDeclarePassive(queueName);
        var now = DateTimeOffset.UtcNow;
        var timeSinceLastReport = (now - _lastReportTime).TotalSeconds;

        var times = _processingTimes.ToArray();
        double avgProcessingTime = times.Length > 0 ? times.Average() : 0;

        long sc = Interlocked.Exchange(ref _successCount, 0);
        long ec = Interlocked.Exchange(ref _errorCount, 0);

        double throughput = timeSinceLastReport > 0 ? sc / timeSinceLastReport : 0;
        double errorRate = (sc + ec) > 0 ? (double)ec / (sc + ec) : 0;

        _lastReportTime = now;

        return new QueueMetrics(
            queueInfo.MessageCount,
            queueInfo.MessageCount * (avgProcessingTime / 1000),
            avgProcessingTime,
            errorRate,
            throughput);
    }

    public void ReportMetrics(IModel channel, string queueName)
    {
        var m = GetMetrics(channel, queueName);
        Console.WriteLine($"Queue: {queueName}");
        Console.WriteLine($"  Depth: {m.QueueDepth} messages");
        Console.WriteLine($"  Lag: {m.ConsumerLag:F2} seconds");
        Console.WriteLine($"  Avg Processing Time: {m.AvgProcessingTime:F2}ms");
        Console.WriteLine($"  Throughput: {m.Throughput:F2} msg/s");
        Console.WriteLine($"  Error Rate: {m.ErrorRate * 100:F2}%");
    }
}

Message Queue Comparison

FeatureRabbitMQAmazon SQSRedis Streams
Delivery GuaranteeAt-most/least/exactly-onceAt-least-onceAt-least-once
Message OrderingPer-queue (FIFO queues)FIFO queues onlyPartial (consumer groups)
RoutingExchanges + bindingsSimple queue modelStream keys
PersistenceDisk + memoryDurable (AWS managed)Memory (snapshots)
ThroughputHighVery highHigh
LatencyLowLow (API call overhead)Very low
Operational ComplexityHigh (self-hosted)Low (managed)Medium
Dead-Letter QueuesBuilt-inBuilt-inManual implementation
MonitoringBuilt-in pluginCloudWatchRedis-native tools
CostInfrastructurePay-per-requestInfrastructure
Best ForComplex routing, high reliabilityScalability, AWS ecosystemHigh throughput, low latency

Production Checklist

  • Connection Pooling: Reuse connections; create a pool of channels/connections
  • Error Handling: Implement exponential backoff for transient failures
  • Monitoring: Track queue depth, consumer lag, error rates, processing time
  • Graceful Shutdown: Drain in-flight messages before stopping consumers
  • Message Serialization: Use versioned schemas (Protocol Buffers, Avro) for backward compatibility
  • Idempotency: Design consumers to handle duplicate message processing safely
  • Dead-Letter Queues: Configure DLQs for failed messages; monitor and alert on DLQ growth
  • Acknowledgment Strategy: Use manual acknowledgment (not auto-ack) for at-least-once delivery
  • Timeouts: Set consumer processing timeouts shorter than visibility timeout
  • Backpressure: Implement prefetch limits and monitor queue depth
  • Retry Strategy: Exponential backoff with jitter; max retries before DLQ
  • Security: Use TLS for transport, authenticate producers/consumers
  • Testing: Integration tests with embedded message broker (TestContainers)
  • Documentation: Document message schemas, routing rules, SLAs
  • Disaster Recovery: Test failover scenarios; document recovery procedures
  • Message TTL: Set message expiration to prevent indefinite queue buildup
  • Circuit Breaker: Stop consuming if downstream service is down; fail fast
  • Capacity Planning: Monitor throughput trends; plan for peak loads
  • Alerting: Alert on queue depth thresholds, error rate spikes, consumer lag growth

Message queues are foundational to reliable distributed systems. The patterns and implementations covered here scale from small teams to enterprise deployments. Start simple, monitor closely, and evolve your queue infrastructure as your system grows.

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

Written by Palakorn Voramongkol

Software Engineer Specialist with 20+ years of experience. Writing about architecture, performance, and building production systems.

More about me

Continue Reading