Skip to content

死信队列

概述

死信队列(Dead Letter Queue,DLQ)用于处理无法被正常消费的消息。这些消息可能是由于多次重试失败、消息过期或被拒绝等原因进入死信队列,便于后续分析和处理。

核心原理

死信队列是一种特殊的消息处理机制,当消息无法被正常消费时,会被转移到死信队列而不是直接丢弃。触发死信队列的条件包括:

  1. 消息被拒绝: 消费者拒绝消息且不重新入队
  2. 消息过期: 消息超过 TTL 仍未被消费
  3. 队列溢出: 队列达到最大长度
mermaid
graph TD
    subgraph 主队列处理流程
        P[生产者] --> Q[主队列]
        Q --> C{消费者处理}
        
        C -->|成功| ACK[确认]
        C -->|失败| R[拒绝]
        
        R -->|重新入队| Q
        R -->|不重新入队| DLQ[死信队列]
    end
    
    subgraph 消息过期场景
        TTL[消息TTL] --> Q2[主队列]
        Q2 -->|过期| DLX[死信交换机]
        DLX --> DLQ
    end
    
    subgraph 队列溢出场景
        MAX[队列满] --> Q3[主队列]
        Q3 -->|溢出| DLX2[死信交换机]
        DLX2 --> DLQ
    end
    
    style DLQ fill:#f9f,stroke:#333

死信队列配置参数

参数说明
x-dead-letter-exchange死信交换机名称
x-dead-letter-routing-key死信路由键
x-max-length队列最大消息数
x-max-length-bytes队列最大字节数

PHP 代码示例

配置死信队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 1. 声明死信交换机
$dlxExchange = 'orders_dlx';
$channel->exchange_declare($dlxExchange, 'direct', false, true, false);

// 2. 声明死信队列
$dlqQueue = 'orders_dlq';
$channel->queue_declare($dlqQueue, false, true, false, false);
$channel->queue_bind($dlqQueue, $dlxExchange, 'failed.orders');

// 3. 声明主队列,配置死信交换机
$mainQueue = 'orders';
$channel->queue_declare(
    $mainQueue,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-dead-letter-exchange' => $dlxExchange,
        'x-dead-letter-routing-key' => 'failed.orders',
        'x-message-ttl' => 86400000,           // 24小时
        'x-max-length' => 10000                 // 最多10000条
    ])
);

echo "死信队列配置完成\n";

$channel->close();
$connection->close();

发送消息并观察死信

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$mainQueue = 'orders';
$channel->queue_declare($mainQueue, false, true, false, false);

// 发送正常消息
$normalMessage = new AMQPMessage(
    json_encode(['order_id' => 'ORD-001', 'status' => 'pending']),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($normalMessage, '', $mainQueue);

// 发送会失败的消息(用于测试死信)
$failingMessage = new AMQPMessage(
    json_encode(['order_id' => 'ORD-002', 'status' => 'invalid']),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'headers' => ['x-death-count' => 0]
    ]
);
$channel->basic_publish($failingMessage, '', $mainQueue);

echo "消息已发送\n";

$channel->close();
$connection->close();

消费死信队列消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$dlqQueue = 'orders_dlq';

echo "监听死信队列...\n";

$callback = function (AMQPMessage $msg) {
    $headers = $msg->has('application_headers') 
        ? $msg->get('application_headers')->getNativeData() 
        : [];
    
    echo "收到死信消息:\n";
    echo "  内容: " . $msg->getBody() . "\n";
    echo "  死亡原因: " . ($headers['x-first-death-reason'] ?? 'unknown') . "\n";
    echo "  死亡时间: " . ($headers['x-first-death-timestamp'] ?? 'unknown') . "\n";
    echo "  重试次数: " . ($headers['x-death-count'] ?? 0) . "\n";
    echo "-------------------\n";
    
    // 分析死信消息
    $this->analyzeDeadLetter($msg);
    
    $msg->ack();
};

$channel->basic_consume($dlqQueue, '', false, false, false, false, $callback);

function analyzeDeadLetter($msg)
{
    $data = json_decode($msg->getBody(), true);
    
    // 记录日志或告警
    error_log("死信消息: " . json_encode($data));
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

拒绝消息进入死信队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queue = 'orders';
$channel->queue_declare($queue, false, true, false, false);

echo "处理订单...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    try {
        // 处理订单
        processOrder($data);
        
        echo "订单处理成功: {$data['order_id']}\n";
        $msg->ack();
        
    } catch (Exception $e) {
        echo "订单处理失败: {$e->getMessage()}\n";
        
        // 记录重试次数
        $headers = $msg->has('application_headers') 
            ? $msg->get('application_headers')->getNativeData() 
            : [];
        $retryCount = ($headers['x-death-count'] ?? 0) + 1;
        
        if ($retryCount >= 3) {
            // 超过3次重试,拒绝消息进入死信队列
            echo "消息将进入死信队列\n";
            $msg->reject(false);  // false = 不重新入队
        } else {
            // 重新入队
            echo "消息重新入队,重试次数: {$retryCount}\n";
            $msg->nack(true);  // true = 重新入队
        }
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue, '', false, false, false, false, $callback);

function processOrder($data)
{
    if ($data['status'] === 'invalid') {
        throw new Exception('Invalid order status');
    }
    
    // 正常处理逻辑
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

实际应用场景

1. 订单处理失败分析

php
<?php

class OrderDeadLetterHandler
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueues();
    }
    
    private function setupQueues()
    {
        // 死信交换机和队列
        $this->channel->exchange_declare('order_dlx', 'topic', false, true, false);
        $this->channel->queue_declare('order_dlq', false, true, false, false);
        $this->channel->queue_bind('order_dlq', 'order_dlx', '#');
        
        // 主队列配置
        $this->channel->queue_declare(
            'orders',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'order_dlx',
                'x-message-ttl' => 3600000
            ])
        );
    }
    
    public function processDeadLetters()
    {
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            $headers = $msg->get('application_headers')->getNativeData();
            
            $reason = $headers['x-first-death-reason'] ?? 'unknown';
            
            switch ($reason) {
                case 'rejected':
                    $this->handleRejected($data, $headers);
                    break;
                case 'expired':
                    $this->handleExpired($data, $headers);
                    break;
                case 'maxlen':
                    $this->handleOverflow($data, $headers);
                    break;
                default:
                    $this->handleUnknown($data, $headers);
            }
            
            $msg->ack();
        };
        
        $this->channel->basic_consume('order_dlq', '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function handleRejected($data, $headers)
    {
        $retryCount = $headers['x-death-count'] ?? 0;
        
        if ($retryCount >= 3) {
            // 多次重试失败,记录并告警
            error_log("订单处理多次失败: " . json_encode($data));
            $this->notifyAdmin($data, 'rejected');
        }
    }
    
    private function handleExpired($data, $headers)
    {
        error_log("订单消息过期: " . json_encode($data));
    }
    
    private function handleOverflow($data, $headers)
    {
        error_log("订单队列溢出: " . json_encode($data));
    }
    
    private function handleUnknown($data, $headers)
    {
        error_log("未知原因死信: " . json_encode($data));
    }
    
    private function notifyAdmin($data, $reason)
    {
        // 发送告警通知
    }
}

2. 消息重试机制

php
<?php

class RetryableMessageHandler
{
    private $channel;
    private $maxRetries = 3;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueues();
    }
    
    private function setupQueues()
    {
        // 死信交换机
        $this->channel->exchange_declare('retry_dlx', 'direct', false, true, false);
        
        // 死信队列 - 转发回主队列实现重试
        $this->channel->queue_declare(
            'retry_dlq',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => 5000,  // 5秒后重试
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => 'orders'
            ])
        );
        $this->channel->queue_bind('retry_dlq', 'retry_dlx', 'retry');
        
        // 主队列
        $this->channel->queue_declare(
            'orders',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'retry_dlx',
                'x-dead-letter-routing-key' => 'retry'
            ])
        );
    }
    
    public function processMessages(callable $handler)
    {
        $callback = function ($msg) use ($handler) {
            $headers = $msg->get('application_headers')->getNativeData();
            $retryCount = $headers['x-death-count'] ?? 0;
            
            try {
                $data = json_decode($msg->getBody(), true);
                $handler($data);
                $msg->ack();
                
            } catch (Exception $e) {
                if ($retryCount >= $this->maxRetries) {
                    // 超过最大重试次数,进入真正的死信队列
                    $this->sendToDeadLetter($msg);
                    $msg->ack();
                } else {
                    // 拒绝消息,触发重试
                    $msg->reject(false);
                }
            }
        };
        
        $this->channel->basic_consume('orders', '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function sendToDeadLetter($msg)
    {
        $dlqMessage = new AMQPMessage(
            $msg->getBody(),
            array_merge($msg->has('application_headers') ? ['application_headers' => $msg->get('application_headers')] : [], [
                'delivery_mode' => $msg->getDeliveryMode()
            ])
        );
        
        $this->channel->basic_publish($dlqMessage, '', 'orders_final_dlq');
    }
}

常见问题与解决方案

问题 1: 消息进入死信队列的原因不明确

症状: 不知道消息为什么进入死信队列

解决方案: 分析死信消息的 headers

php
<?php

// 死信消息会包含以下 headers
$headers = [
    'x-first-death-exchange' => '原始交换机',
    'x-first-death-queue' => '原始队列',
    'x-first-death-reason' => 'rejected|expired|maxlen',
    'x-first-death-timestamp' => '死亡时间戳',
    'x-death' => '死亡信息数组',
    'x-death-count' => '死亡次数'
];

问题 2: 死信队列消息堆积

症状: 死信队列消息越来越多

解决方案: 定期清理和分析死信消息

php
<?php

// 监控死信队列消息数量
list(, $messageCount, ) = $channel->queue_declare('order_dlq', true);
echo "死信队列消息数: {$messageCount}\n";

// 定期归档或清理
if ($messageCount > 10000) {
    // 归档旧消息到数据库
    $this->archiveDeadLetters();
}

问题 3: 循环死信

症状: 消息在死信队列和主队列之间循环

解决方案: 设置合理的重试次数和死信规则

php
<?php

// 确保死信交换机配置正确,不要指回原队列
// 方案一:使用不同的路由键
new AMQPTable([
    'x-dead-letter-exchange' => 'dlx',
    'x-dead-letter-routing-key' => 'dlq.key'  // 不要和主队列的key相同
]);

// 方案二:使用队列参数限制
new AMQPTable([
    'x-dead-letter-exchange' => 'dlx',
    'x-max-requeue-length' => 0  // 不允许重新入队
]);

最佳实践建议

  1. 分离死信队列: 为不同类型的消息创建不同的死信队列
  2. 记录详细信息: 在消息中包含足够的上下文信息
  3. 定期分析: 定期分析死信消息,优化系统
  4. 告警机制: 死信队列有消息时发送告警
  5. 自动化处理: 实现死信消息的自动重试或人工处理流程

相关链接