Skip to content

消费者确认

概述

消费者确认(Consumer Acknowledgements)是 RabbitMQ 保证消息可靠消费的机制。消费者在处理完消息后发送确认,Broker 才会删除消息,否则消息会被重新投递。

核心原理

确认流程

mermaid
sequenceDiagram
    participant B as Broker
    participant C as 消费者
    
    B->>C: 投递消息
    Note over C: 消息状态: unacked
    
    alt 处理成功
        C->>B: ACK
        B->>B: 删除消息
    else 处理失败-可重试
        C->>B: NACK(requeue=true)
        B->>B: 重新入队
    else 处理失败-不可重试
        C->>B: REJECT(requeue=false)
        B->>B: 丢弃/死信
    end

确认方式

方式方法说明
自动确认no_ack=true消息投递后立即确认
手动确认basic_ack确认单条或批量消息
拒绝重入basic_nack拒绝并可选择重新入队
拒绝丢弃basic_reject拒绝单条消息

消息状态

mermaid
stateDiagram-v2
    [*] --> Ready: 消息入队
    Ready --> Unacked: 投递给消费者
    Unacked --> Ready: NACK(requeue=true)
    Unacked --> Acked: ACK
    Unacked --> Dead: REJECT/NACK(requeue=false)
    Acked --> [*]: 删除
    Dead --> [*]: 死信/丢弃

PHP 代码示例

自动确认模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'auto-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "自动确认模式消费者\n";
echo "警告: 消息投递后立即确认,可能丢失消息\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    echo "收到消息: " . json_encode($data) . "\n";
    
    // 处理消息(即使处理失败,消息也已确认)
    processMessage($data);
};

// no_ack = true,自动确认
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

手动确认模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'manual-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "手动确认模式消费者\n";

// 设置 prefetch,一次只处理一条消息
$channel->basic_qos(null, 1, null);

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo "收到消息: " . json_encode($data) . "\n";
    
    try {
        // 处理消息
        $result = processMessage($data);
        
        if ($result) {
            // 处理成功,确认消息
            $msg->ack();
            echo "消息已确认\n";
        } else {
            // 处理失败,拒绝并重新入队
            $msg->nack(true);
            echo "消息已拒绝,重新入队\n";
        }
        
    } catch (Exception $e) {
        echo "处理异常: " . $e->getMessage() . "\n";
        
        // 根据异常类型决定处理方式
        if ($e instanceof RecoverableException) {
            // 可恢复异常,重新入队
            $msg->nack(true);
        } else {
            // 不可恢复异常,丢弃消息
            $msg->reject(false);
        }
    }
};

// no_ack = false,手动确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 业务处理逻辑
    return true;
}

class RecoverableException extends Exception {}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

批量确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'batch-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "批量确认模式消费者\n";

$batchSize = 10;
$processedCount = 0;
$lastDeliveryTag = 0;

$callback = function (AMQPMessage $msg) use (&$processedCount, &$lastDeliveryTag, $batchSize, $channel) {
    $data = json_decode($msg->getBody(), true);
    $deliveryTag = $msg->getDeliveryTag();
    
    echo "处理消息 #{$deliveryTag}\n";
    
    // 处理消息
    processMessage($data);
    
    $processedCount++;
    $lastDeliveryTag = $deliveryTag;
    
    // 达到批次大小时批量确认
    if ($processedCount >= $batchSize) {
        // multiple = true,确认到当前 tag 的所有消息
        $channel->basic_ack($lastDeliveryTag, true);
        echo "批量确认 {$processedCount} 条消息\n";
        
        $processedCount = 0;
        $lastDeliveryTag = 0;
    }
};

$channel->basic_qos(null, $batchSize, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 业务处理
}

while ($channel->is_consuming()) {
    $channel->wait();
}

// 确认剩余消息
if ($processedCount > 0) {
    $channel->basic_ack($lastDeliveryTag, true);
    echo "确认剩余 {$processedCount} 条消息\n";
}

$channel->close();
$connection->close();

消费者确认管理类

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class ConsumerAckManager
{
    private $channel;
    private $unackedMessages = [];
    private $batchSize;
    private $autoFlush;
    
    public function __construct($channel, $batchSize = 1, $autoFlush = true)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->autoFlush = $autoFlush;
    }
    
    public function track(AMQPMessage $msg)
    {
        $deliveryTag = $msg->getDeliveryTag();
        $this->unackedMessages[$deliveryTag] = [
            'msg' => $msg,
            'received_at' => time()
        ];
    }
    
    public function ack(AMQPMessage $msg, $multiple = false)
    {
        $deliveryTag = $msg->getDeliveryTag();
        
        if ($multiple) {
            // 批量确认
            $this->channel->basic_ack($deliveryTag, true);
            
            foreach ($this->unackedMessages as $tag => $info) {
                if ($tag <= $deliveryTag) {
                    unset($this->unackedMessages[$tag]);
                }
            }
        } else {
            // 单条确认
            $msg->ack();
            unset($this->unackedMessages[$deliveryTag]);
        }
    }
    
    public function nack(AMQPMessage $msg, $requeue = true)
    {
        $msg->nack($requeue);
        unset($this->unackedMessages[$msg->getDeliveryTag()]);
    }
    
    public function reject(AMQPMessage $msg, $requeue = false)
    {
        $msg->reject($requeue);
        unset($this->unackedMessages[$msg->getDeliveryTag()]);
    }
    
    public function flush()
    {
        if (empty($this->unackedMessages)) {
            return;
        }
        
        $lastTag = max(array_keys($this->unackedMessages));
        $this->channel->basic_ack($lastTag, true);
        $this->unackedMessages = [];
    }
    
    public function getUnackedCount()
    {
        return count($this->unackedMessages);
    }
    
    public function getUnackedMessages()
    {
        return $this->unackedMessages;
    }
}

实际应用场景

1. 可靠消息处理

php
<?php

class ReliableConsumer
{
    private $channel;
    private $queueName;
    private $ackManager;
    private $maxRetries = 3;
    
    public function __construct($channel, $queueName)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        $this->ackManager = new ConsumerAckManager($channel);
    }
    
    public function consume(callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $this->ackManager->track($msg);
            
            $data = json_decode($msg->getBody(), true);
            $retryCount = $this->getRetryCount($msg);
            
            try {
                $handler($data);
                $this->ackManager->ack($msg);
                
            } catch (RecoverableException $e) {
                if ($retryCount < $this->maxRetries) {
                    $this->ackManager->nack($msg, true);
                    echo "消息重新入队,重试 " . ($retryCount + 1) . "/{$this->maxRetries}\n";
                } else {
                    $this->ackManager->reject($msg, false);
                    echo "消息超过最大重试次数,已拒绝\n";
                }
                
            } catch (UnrecoverableException $e) {
                $this->ackManager->reject($msg, false);
                echo "不可恢复错误,消息已拒绝\n";
            }
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function getRetryCount($msg)
    {
        if (!$msg->has('application_headers')) {
            return 0;
        }
        
        $headers = $msg->get('application_headers')->getNativeData();
        return $headers['x-retry-count'] ?? 0;
    }
}

class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}

2. 事务性消费

php
<?php

class TransactionalConsumer
{
    private $channel;
    private $db;
    
    public function __construct($channel, $db)
    {
        $this->channel = $channel;
        $this->db = $db;
    }
    
    public function consume($queueName, callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            
            try {
                // 开始数据库事务
                $this->db->beginTransaction();
                
                // 执行业务逻辑
                $handler($data, $this->db);
                
                // 提交事务
                $this->db->commit();
                
                // 确认消息
                $msg->ack();
                
                echo "消息处理成功\n";
                
            } catch (Exception $e) {
                // 回滚事务
                $this->db->rollBack();
                
                // 拒绝消息
                $msg->nack(true);
                
                echo "消息处理失败: " . $e->getMessage() . "\n";
            }
        };
        
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

3. 多消费者负载均衡

php
<?php

class LoadBalancedConsumer
{
    private $channel;
    private $queueName;
    private $consumerCount;
    
    public function __construct($channel, $queueName, $consumerCount = 1)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        $this->consumerCount = $consumerCount;
    }
    
    public function consume(callable $handler)
    {
        // 设置公平分发
        $this->channel->basic_qos(null, 1, null);
        
        for ($i = 0; $i < $this->consumerCount; $i++) {
            $consumerTag = 'consumer-' . $i;
            
            $callback = function ($msg) use ($handler, $consumerTag) {
                $data = json_decode($msg->getBody(), true);
                
                echo "[{$consumerTag}] 处理消息\n";
                
                try {
                    $handler($data);
                    $msg->ack();
                } catch (Exception $e) {
                    $msg->nack(true);
                }
            };
            
            $this->channel->basic_consume(
                $this->queueName,
                $consumerTag,
                false,
                false,
                false,
                false,
                $callback
            );
        }
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

常见问题与解决方案

问题 1: 消息重复消费

症状: 同一消息被多次处理

解决方案:

php
<?php

// 实现幂等性
$callback = function ($msg) use ($redis) {
    $data = json_decode($msg->getBody(), true);
    $messageId = $data['message_id'];
    
    // 检查是否已处理
    if ($redis->exists("processed:{$messageId}")) {
        $msg->ack();  // 已处理,直接确认
        return;
    }
    
    // 处理消息
    processMessage($data);
    
    // 标记为已处理
    $redis->setex("processed:{$messageId}", 86400, 1);
    
    $msg->ack();
};

问题 2: 消息积压

症状: 队列中消息越来越多

解决方案:

php
<?php

// 设置合理的 prefetch
$channel->basic_qos(null, 10, null);  // 每次预取 10 条

// 增加消费者数量
// 或优化处理逻辑

// 监控队列深度
list(, $messageCount,) = $channel->queue_declare($queueName, true);
echo "队列消息数: {$messageCount}\n";

问题 3: 消费者断开消息丢失

症状: 消费者断开后消息丢失

解决方案:

php
<?php

// 使用手动确认模式
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

// 处理完成后再确认
$callback = function ($msg) {
    try {
        processMessage($msg);
        $msg->ack();  // 处理成功后确认
    } catch (Exception $e) {
        $msg->nack(true);  // 处理失败,重新入队
    }
};

最佳实践建议

  1. 使用手动确认: 避免自动确认导致消息丢失
  2. 设置 prefetch: 合理设置预取数量,实现公平分发
  3. 实现幂等性: 处理重复消息
  4. 异常分类处理: 区分可恢复和不可恢复错误
  5. 监控确认状态: 监控未确认消息数量

相关链接