Skip to content

消息确认机制

概述

消息确认机制是 RabbitMQ 保证消息可靠传递的核心机制。它确保消息从生产者到 Broker,以及从 Broker 到消费者的过程中不会丢失。

核心原理

消息确认流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant B as Broker
    participant C as 消费者
    
    Note over P,B: 生产者确认
    P->>B: 发送消息
    B->>B: 持久化消息
    B-->>P: 确认 (ACK)
    
    Note over B,C: 消费者确认
    B->>C: 投递消息
    C->>C: 处理消息
    C-->>B: 确认 (ACK)
    B->>B: 删除消息

确认类型

mermaid
graph TD
    subgraph 生产者确认
        PA[Publisher Confirm] --> PS[同步确认]
        PA --> PA2[异步确认]
        PA --> PB[批量确认]
    end
    
    subgraph 消费者确认
        CA[Consumer ACK] --> ACK[确认]
        CA --> NACK[拒绝]
        CA --> REJECT[拒绝不重入]
    end
    
    subgraph 事务
        TX[Transaction] --> TX1[开启事务]
        TX1 --> TX2[发送消息]
        TX2 --> TX3[提交事务]
    end

确认方式对比

方式可靠性性能复杂度
自动确认
手动确认
事务
发布确认

PHP 代码示例

自动确认模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'auto-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "自动确认模式消费者\n";

$callback = function (AMQPMessage $msg) {
    echo "收到消息: " . $msg->getBody() . "\n";
    // 消息投递后立即确认,无论处理是否成功
};

// 第三个参数 true 表示自动确认
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

手动确认模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'manual-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "手动确认模式消费者\n";

$callback = function (AMQPMessage $msg) {
    try {
        $data = json_decode($msg->getBody(), true);
        
        echo "处理消息: " . json_encode($data) . "\n";
        
        // 业务处理
        processMessage($data);
        
        // 确认消息
        $msg->ack();
        echo "消息已确认\n";
        
    } catch (Exception $e) {
        echo "处理失败: " . $e->getMessage() . "\n";
        
        // 拒绝消息,重新入队
        $msg->nack(true);
        echo "消息已拒绝,重新入队\n";
    }
};

// 设置 prefetch,一次只处理一条消息
$channel->basic_qos(null, 1, null);

// 第三个参数 false 表示手动确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 模拟业务处理
    if (rand(1, 10) === 1) {
        throw new Exception("随机失败");
    }
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

批量确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'batch-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "批量确认模式消费者\n";

$batchSize = 10;
$processedCount = 0;
$deliveryTags = [];

$callback = function (AMQPMessage $msg) use (&$processedCount, &$deliveryTags, $batchSize, $channel) {
    $deliveryTags[] = $msg->getDeliveryTag();
    $processedCount++;
    
    echo "处理消息 #{$processedCount}\n";
    
    // 处理消息
    processMessage(json_decode($msg->getBody(), true));
    
    // 达到批量大小后批量确认
    if ($processedCount >= $batchSize) {
        // 批量确认(确认最后一个 tag,multiple=true 会确认之前的所有消息)
        $channel->basic_ack($msg->getDeliveryTag(), true);
        echo "批量确认 {$processedCount} 条消息\n";
        
        $processedCount = 0;
        $deliveryTags = [];
    }
};

$channel->basic_qos(null, $batchSize, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processMessage($data)
{
    // 业务处理
}

while ($channel->is_consuming()) {
    $channel->wait();
}

// 处理剩余消息
if ($processedCount > 0) {
    $channel->basic_ack(end($deliveryTags), true);
    echo "确认剩余 {$processedCount} 条消息\n";
}

$channel->close();
$connection->close();

消息拒绝

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'reject-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "消息拒绝示例\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    $retryCount = $msg->get('application_headers')
        ? $msg->get('application_headers')->getNativeData()['x-retry-count'] ?? 0
        : 0;
    
    try {
        processMessage($data);
        $msg->ack();
        echo "消息处理成功\n";
        
    } catch (RecoverableException $e) {
        // 可恢复错误,重新入队
        if ($retryCount < 3) {
            $msg->nack(true);  // requeue = true
            echo "消息重新入队,重试次数: " . ($retryCount + 1) . "\n";
        } else {
            // 超过重试次数,发送到死信队列
            $msg->nack(false);  // requeue = false
            echo "消息超过重试次数,已拒绝\n";
        }
        
    } catch (UnrecoverableException $e) {
        // 不可恢复错误,直接拒绝
        $msg->reject(false);  // requeue = false
        echo "消息无法处理,已拒绝\n";
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}

function processMessage($data)
{
    // 业务处理
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

确认管理类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AckManager
{
    private $channel;
    private $pendingAcks = [];
    private $batchSize;
    
    public function __construct($channel, $batchSize = 10)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
    }
    
    public function ack(AMQPMessage $msg)
    {
        $deliveryTag = $msg->getDeliveryTag();
        $this->pendingAcks[] = $deliveryTag;
        
        if (count($this->pendingAcks) >= $this->batchSize) {
            $this->flushAcks();
        }
    }
    
    public function ackImmediately(AMQPMessage $msg)
    {
        $msg->ack();
    }
    
    public function nack(AMQPMessage $msg, $requeue = true)
    {
        $msg->nack($requeue);
    }
    
    public function reject(AMQPMessage $msg, $requeue = false)
    {
        $msg->reject($requeue);
    }
    
    public function flushAcks()
    {
        if (empty($this->pendingAcks)) {
            return;
        }
        
        $lastTag = end($this->pendingAcks);
        $this->channel->basic_ack($lastTag, true);  // multiple = true
        
        $count = count($this->pendingAcks);
        $this->pendingAcks = [];
        
        return $count;
    }
    
    public function getPendingCount()
    {
        return count($this->pendingAcks);
    }
}

实际应用场景

1. 可靠消息处理

php
<?php

class ReliableMessageConsumer
{
    private $channel;
    private $queueName;
    private $maxRetries = 3;
    
    public function __construct($channel, $queueName)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
    }
    
    public function consume(callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $headers = $this->getHeaders($msg);
            $retryCount = $headers['x-retry-count'] ?? 0;
            
            try {
                $data = json_decode($msg->getBody(), true);
                $handler($data);
                $msg->ack();
                
            } catch (Exception $e) {
                $this->handleError($msg, $e, $retryCount);
            }
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function getHeaders($msg)
    {
        if (!$msg->has('application_headers')) {
            return [];
        }
        return $msg->get('application_headers')->getNativeData();
    }
    
    private function handleError($msg, $exception, $retryCount)
    {
        error_log("消息处理失败: " . $exception->getMessage());
        
        if ($retryCount < $this->maxRetries) {
            // 重新入队
            $msg->nack(true);
        } else {
            // 超过重试次数,发送到死信队列
            $msg->reject(false);
        }
    }
}

2. 事务性消息处理

php
<?php

class TransactionalConsumer
{
    private $channel;
    private $db;
    
    public function __construct($channel, $db)
    {
        $this->channel = $channel;
        $this->db = $db;
    }
    
    public function consume($queueName, callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            
            try {
                // 开始数据库事务
                $this->db->beginTransaction();
                
                // 执行业务逻辑
                $handler($data, $this->db);
                
                // 提交数据库事务
                $this->db->commit();
                
                // 确认消息
                $msg->ack();
                
                echo "消息处理成功\n";
                
            } catch (Exception $e) {
                // 回滚数据库事务
                $this->db->rollBack();
                
                // 拒绝消息
                $msg->nack(true);
                
                echo "消息处理失败: " . $e->getMessage() . "\n";
            }
        };
        
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

3. 优先级确认

php
<?php

class PriorityAckConsumer
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function consume($queueName)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $callback = function ($msg) {
            $priority = $msg->get('priority') ?? 0;
            $data = json_decode($msg->getBody(), true);
            
            echo sprintf(
                "处理消息 [优先级: %d]: %s\n",
                $priority,
                json_encode($data)
            );
            
            // 根据优先级决定处理方式
            if ($priority >= 8) {
                // 高优先级消息立即处理并确认
                $this->processHighPriority($data);
                $msg->ack();
            } else {
                // 普通消息批量确认
                $this->processNormal($data);
                $msg->ack();
            }
        };
        
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function processHighPriority($data)
    {
        // 高优先级处理逻辑
    }
    
    private function processNormal($data)
    {
        // 普通处理逻辑
    }
}

常见问题与解决方案

问题 1: 消息重复消费

症状: 消息被多次处理

原因: 消费者处理完成后未确认,或确认失败

解决方案:

php
<?php

// 实现幂等性
$callback = function ($msg) {
    $data = json_decode($msg->getBody(), true);
    $messageId = $data['message_id'];
    
    // 使用 Redis 检查是否已处理
    if ($redis->exists("processed:{$messageId}")) {
        $msg->ack();  // 已处理,直接确认
        return;
    }
    
    // 处理消息
    processMessage($data);
    
    // 标记为已处理
    $redis->setex("processed:{$messageId}", 86400, '1');
    
    $msg->ack();
};

问题 2: 消息丢失

症状: 消息未被处理就消失了

原因: 使用了自动确认模式

解决方案:

php
<?php

// 使用手动确认模式
$channel->basic_consume(
    $queueName,
    '',
    false,
    false,  // no_ack = false,手动确认
    false,
    false,
    $callback
);

// 处理完成后再确认
$callback = function ($msg) {
    try {
        processMessage($msg);
        $msg->ack();  // 处理成功后确认
    } catch (Exception $e) {
        $msg->nack(true);  // 处理失败,重新入队
    }
};

问题 3: 消息积压

症状: 队列中消息越来越多

原因: 消费者处理速度慢,或 prefetch 设置过大

解决方案:

php
<?php

// 设置合理的 prefetch
$channel->basic_qos(null, 1, null);  // 每次只取一条

// 增加消费者数量
// 或优化处理逻辑

最佳实践建议

  1. 使用手动确认: 避免自动确认导致消息丢失
  2. 设置 prefetch: 合理设置预取数量
  3. 实现幂等性: 处理重复消息
  4. 异常处理: 区分可恢复和不可恢复错误
  5. 监控确认状态: 监控未确认消息数量

相关链接