Skip to content

消息拒绝与重回队列

概述

在 RabbitMQ 中,消费者可以拒绝消息而不是确认它。拒绝的消息可以选择重新入队(requeue)或直接丢弃。正确使用消息拒绝机制对于构建可靠的消息系统至关重要。

核心原理

消息拒绝方式

mermaid
graph TD
    subgraph 消息拒绝方式
        N[NACK] --> R1[requeue=true<br/>重新入队]
        N --> R2[requeue=false<br/>丢弃/死信]
        
        RE[REJECT] --> R3[requeue=true<br/>重新入队]
        RE --> R4[requeue=false<br/>丢弃/死信]
    end
    
    subgraph 拒绝后处理
        R1 --> Q[回到队列头部]
        R3 --> Q
        R2 --> DLQ[死信队列]
        R4 --> DLQ
        R2 --> D[直接丢弃]
        R4 --> D
    end
    
    style N fill:#87CEEB
    style RE fill:#90EE90

NACK vs REJECT

方法批量操作说明
basic_nack支持可以批量拒绝多条消息
basic_reject不支持只能拒绝单条消息

拒绝流程

mermaid
sequenceDiagram
    participant B as Broker
    participant C as 消费者
    
    B->>C: 投递消息
    C->>C: 处理消息
    
    alt 处理成功
        C->>B: ACK
        B->>B: 删除消息
    else 可恢复错误
        C->>B: NACK(requeue=true)
        B->>B: 消息重新入队
    else 不可恢复错误
        C->>B: REJECT(requeue=false)
        B->>B: 发送到死信队列
    end

PHP 代码示例

基本拒绝操作

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'reject-demo-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "消息拒绝示例\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo "收到消息: " . json_encode($data) . "\n";
    
    // 方式一:使用 nack
    // $msg->nack($requeue);  // $requeue = true 重新入队,false 丢弃
    
    // 方式二:使用 reject
    // $msg->reject($requeue);  // $requeue = true 重新入队,false 丢弃
    
    // 示例:处理成功则确认,失败则拒绝
    if ($data['status'] === 'valid') {
        $msg->ack();
        echo "消息已确认\n";
    } else {
        $msg->reject(false);  // 不重新入队
        echo "消息已拒绝\n";
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

带重试次数的拒绝

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明死信队列
$dlqExchange = 'dlx';
$channel->exchange_declare($dlqExchange, 'direct', false, true, false);
$channel->queue_declare('failed_messages', false, true, false, false);
$channel->queue_bind('failed_messages', $dlqExchange, 'failed');

// 声明主队列
$queueName = 'retry-queue';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-dead-letter-exchange' => $dlqExchange,
        'x-dead-letter-routing-key' => 'failed'
    ])
);

$maxRetries = 3;

$callback = function (AMQPMessage $msg) use ($maxRetries) {
    $data = json_decode($msg->getBody(), true);
    
    // 获取重试次数
    $headers = $msg->has('application_headers')
        ? $msg->get('application_headers')->getNativeData()
        : [];
    $retryCount = $headers['x-retry-count'] ?? 0;
    
    echo sprintf(
        "处理消息 (重试次数: %d/%d): %s\n",
        $retryCount,
        $maxRetries,
        json_encode($data)
    );
    
    try {
        // 处理消息
        processMessage($data);
        
        $msg->ack();
        echo "消息处理成功\n";
        
    } catch (Exception $e) {
        echo "处理失败: " . $e->getMessage() . "\n";
        
        if ($retryCount < $maxRetries) {
            // 重新入队,增加重试计数
            $msg->nack(true);
            echo "消息重新入队\n";
        } else {
            // 超过最大重试次数,拒绝并进入死信队列
            $msg->reject(false);
            echo "消息超过最大重试次数,已发送到死信队列\n";
        }
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 模拟随机失败
    if (rand(1, 3) === 1) {
        throw new Exception("随机处理失败");
    }
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

批量拒绝

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'batch-nack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "批量拒绝示例\n";

// 使用 basic_nack 批量拒绝
$callback = function (AMQPMessage $msg) {
    $deliveryTag = $msg->getDeliveryTag();
    $data = json_decode($msg->getBody(), true);
    
    echo "处理消息: " . json_encode($data) . "\n";
    
    if ($data['batch_fail']) {
        // 批量拒绝:拒绝当前消息及之前所有未确认的消息
        // multiple = true 表示批量操作
        $msg->getChannel()->basic_nack(
            $deliveryTag,
            true,   // multiple: 批量拒绝
            true    // requeue: 重新入队
        );
        echo "批量拒绝到 tag: {$deliveryTag}\n";
    } else {
        // 单条拒绝
        $msg->nack(true);
        echo "单条拒绝\n";
    }
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

消息拒绝处理器类

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class MessageRejectHandler
{
    private $maxRetries;
    private $retryDelay;
    
    public function __construct($maxRetries = 3, $retryDelay = 1000)
    {
        $this->maxRetries = $maxRetries;
        $this->retryDelay = $retryDelay;
    }
    
    public function handle(AMQPMessage $msg, callable $processor)
    {
        $retryCount = $this->getRetryCount($msg);
        
        try {
            $data = json_decode($msg->getBody(), true);
            $processor($data);
            
            $msg->ack();
            return true;
            
        } catch (RecoverableException $e) {
            return $this->handleRecoverable($msg, $retryCount, $e);
        } catch (UnrecoverableException $e) {
            return $this->handleUnrecoverable($msg, $e);
        }
    }
    
    private function getRetryCount(AMQPMessage $msg)
    {
        if (!$msg->has('application_headers')) {
            return 0;
        }
        
        $headers = $msg->get('application_headers')->getNativeData();
        return $headers['x-retry-count'] ?? 0;
    }
    
    private function handleRecoverable(AMQPMessage $msg, $retryCount, $exception)
    {
        if ($retryCount < $this->maxRetries) {
            // 延迟后重新入队
            usleep($this->retryDelay * 1000);
            $msg->nack(true);
            
            echo sprintf(
                "可恢复错误,重新入队 (重试 %d/%d): %s\n",
                $retryCount + 1,
                $this->maxRetries,
                $exception->getMessage()
            );
            
            return false;
        }
        
        return $this->handleUnrecoverable($msg, $exception);
    }
    
    private function handleUnrecoverable(AMQPMessage $msg, $exception)
    {
        $msg->reject(false);
        
        echo "不可恢复错误,消息已拒绝: " . $exception->getMessage() . "\n";
        
        return false;
    }
}

class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}

实际应用场景

1. 错误分类处理

php
<?php

class ErrorMessageHandler
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function handle(AMQPMessage $msg)
    {
        $data = json_decode($msg->getBody(), true);
        
        try {
            $this->process($data);
            $msg->ack();
            
        } catch (TemporaryException $e) {
            // 临时错误,稍后重试
            $this->handleTemporaryError($msg, $e);
            
        } catch (ValidationException $e) {
            // 验证错误,不重试
            $this->handleValidationError($msg, $e);
            
        } catch (BusinessException $e) {
            // 业务错误,记录后丢弃
            $this->handleBusinessError($msg, $e);
            
        } catch (SystemException $e) {
            // 系统错误,告警
            $this->handleSystemError($msg, $e);
        }
    }
    
    private function handleTemporaryError($msg, $exception)
    {
        // 临时错误,重新入队
        $msg->nack(true);
        $this->log("临时错误,重新入队: " . $exception->getMessage());
    }
    
    private function handleValidationError($msg, $exception)
    {
        // 验证错误,不重试,发送到错误队列
        $this->sendToErrorQueue($msg, 'validation_error');
        $msg->reject(false);
        $this->log("验证错误: " . $exception->getMessage());
    }
    
    private function handleBusinessError($msg, $exception)
    {
        // 业务错误,记录日志后丢弃
        $this->logBusinessError($msg, $exception);
        $msg->ack();  // 确认消息,不重试
    }
    
    private function handleSystemError($msg, $exception)
    {
        // 系统错误,告警并重新入队
        $this->alert($exception);
        $msg->nack(true);
    }
    
    private function process($data)
    {
        // 业务处理逻辑
    }
    
    private function sendToErrorQueue($msg, $reason)
    {
        // 发送到错误队列
    }
    
    private function log($message)
    {
        error_log($message);
    }
    
    private function logBusinessError($msg, $exception)
    {
        // 记录业务错误
    }
    
    private function alert($exception)
    {
        // 发送告警
    }
}

class TemporaryException extends Exception {}
class ValidationException extends Exception {}
class BusinessException extends Exception {}
class SystemException extends Exception {}

2. 延迟重试

php
<?php

class DelayedRetryHandler
{
    private $channel;
    private $retryDelays = [1000, 5000, 30000, 60000];  // 毫秒
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupRetryQueues();
    }
    
    private function setupRetryQueues()
    {
        foreach ($this->retryDelays as $index => $delay) {
            $queueName = "retry_{$index}";
            
            $this->channel->queue_declare(
                $queueName,
                false,
                true,
                false,
                false,
                false,
                new AMQPTable([
                    'x-message-ttl' => $delay,
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => 'main_queue'
                ])
            );
        }
    }
    
    public function handleWithRetry(AMQPMessage $msg, callable $processor)
    {
        $retryCount = $this->getRetryCount($msg);
        
        try {
            $processor(json_decode($msg->getBody(), true));
            $msg->ack();
            
        } catch (Exception $e) {
            if ($retryCount < count($this->retryDelays)) {
                // 发送到延迟重试队列
                $this->sendToRetryQueue($msg, $retryCount);
                $msg->ack();
                
                echo "消息已发送到重试队列,延迟: " . $this->retryDelays[$retryCount] . "ms\n";
            } else {
                // 超过最大重试次数
                $msg->reject(false);
                echo "消息超过最大重试次数\n";
            }
        }
    }
    
    private function sendToRetryQueue($msg, $retryCount)
    {
        $queueName = "retry_{$retryCount}";
        
        $message = new AMQPMessage(
            $msg->getBody(),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable([
                    'x-retry-count' => $retryCount + 1
                ])
            ]
        );
        
        $this->channel->basic_publish($message, '', $queueName);
    }
    
    private function getRetryCount($msg)
    {
        if (!$msg->has('application_headers')) {
            return 0;
        }
        
        $headers = $msg->get('application_headers')->getNativeData();
        return $headers['x-retry-count'] ?? 0;
    }
}

3. 死信队列处理

php
<?php

class DeadLetterHandler
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function setupDeadLetterInfrastructure()
    {
        // 死信交换机
        $this->channel->exchange_declare('dlx', 'direct', false, true, false);
        
        // 死信队列
        $this->channel->queue_declare('dead_letter_queue', false, true, false, false);
        $this->channel->queue_bind('dead_letter_queue', 'dlx', 'dead');
        
        // 主队列(配置死信)
        $this->channel->queue_declare(
            'main_queue',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-dead-letter-exchange' => 'dlx',
                'x-dead-letter-routing-key' => 'dead'
            ])
        );
    }
    
    public function processDeadLetters()
    {
        $callback = function ($msg) {
            $headers = $msg->has('application_headers')
                ? $msg->get('application_headers')->getNativeData()
                : [];
            
            $deathInfo = $headers['x-death'] ?? [];
            
            echo "死信消息:\n";
            echo "  原始队列: " . ($deathInfo[0]['queue'] ?? 'unknown') . "\n";
            echo "  死亡原因: " . ($deathInfo[0]['reason'] ?? 'unknown') . "\n";
            echo "  死亡时间: " . date('Y-m-d H:i:s', $deathInfo[0]['time'] ?? 0) . "\n";
            echo "  消息内容: " . $msg->getBody() . "\n";
            
            // 分析并处理死信
            $this->analyzeAndHandle($msg, $deathInfo);
            
            $msg->ack();
        };
        
        $this->channel->basic_consume('dead_letter_queue', '', false, false, false, false, $callback);
    }
    
    private function analyzeAndHandle($msg, $deathInfo)
    {
        $reason = $deathInfo[0]['reason'] ?? 'unknown';
        
        switch ($reason) {
            case 'rejected':
                $this->handleRejected($msg);
                break;
            case 'expired':
                $this->handleExpired($msg);
                break;
            case 'maxlen':
                $this->handleOverflow($msg);
                break;
            default:
                $this->handleUnknown($msg);
        }
    }
    
    private function handleRejected($msg)
    {
        // 处理被拒绝的消息
        $this->logError($msg, 'rejected');
    }
    
    private function handleExpired($msg)
    {
        // 处理过期的消息
        $this->logError($msg, 'expired');
    }
    
    private function handleOverflow($msg)
    {
        // 处理溢出的消息
        $this->logError($msg, 'overflow');
    }
    
    private function handleUnknown($msg)
    {
        // 处理未知原因的消息
        $this->logError($msg, 'unknown');
    }
    
    private function logError($msg, $reason)
    {
        error_log("死信消息 [{$reason}]: " . $msg->getBody());
    }
}

常见问题与解决方案

问题 1: 无限重试循环

症状: 消息不断被拒绝和重新入队

解决方案:

php
<?php

// 设置最大重试次数
$maxRetries = 3;

$callback = function ($msg) use ($maxRetries) {
    $retryCount = getRetryCount($msg);
    
    try {
        process($msg);
        $msg->ack();
    } catch (Exception $e) {
        if ($retryCount < $maxRetries) {
            $msg->nack(true);  // 重新入队
        } else {
            $msg->reject(false);  // 发送到死信队列
        }
    }
};

问题 2: 消息顺序问题

症状: 拒绝的消息重新入队后顺序错乱

解决方案:

php
<?php

// 使用延迟队列保持顺序
// 被拒绝的消息发送到延迟队列,而不是直接重新入队

$retryQueue = 'retry_queue';
$this->channel->queue_declare(
    $retryQueue,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 5000,
        'x-dead-letter-exchange' => '',
        'x-dead-letter-routing-key' => 'main_queue'
    ])
);

// 拒绝时发送到重试队列
$msg->ack();  // 先确认
$this->channel->basic_publish($retryMessage, '', $retryQueue);  // 发送到重试队列

问题 3: 性能问题

症状: 大量消息拒绝导致性能下降

解决方案:

php
<?php

// 区分错误类型,避免不必要的重试
$callback = function ($msg) {
    try {
        process($msg);
        $msg->ack();
    } catch (ValidationException $e) {
        // 验证错误,不重试
        $msg->reject(false);
    } catch (TemporaryException $e) {
        // 临时错误,重试
        $msg->nack(true);
    }
};

最佳实践建议

  1. 设置重试上限: 避免无限重试
  2. 区分错误类型: 根据错误类型决定是否重试
  3. 使用死信队列: 处理失败的消息
  4. 记录错误日志: 便于问题排查
  5. 监控拒绝率: 监控消息拒绝情况

相关链接