Appearance
消息确认机制
概述
消息确认机制是 RabbitMQ 保证消息可靠传递的核心机制。它确保消息从生产者到 Broker,以及从 Broker 到消费者的过程中不会丢失。
核心原理
消息确认流程
mermaid
sequenceDiagram
participant P as 生产者
participant B as Broker
participant C as 消费者
Note over P,B: 生产者确认
P->>B: 发送消息
B->>B: 持久化消息
B-->>P: 确认 (ACK)
Note over B,C: 消费者确认
B->>C: 投递消息
C->>C: 处理消息
C-->>B: 确认 (ACK)
B->>B: 删除消息确认类型
mermaid
graph TD
subgraph 生产者确认
PA[Publisher Confirm] --> PS[同步确认]
PA --> PA2[异步确认]
PA --> PB[批量确认]
end
subgraph 消费者确认
CA[Consumer ACK] --> ACK[确认]
CA --> NACK[拒绝]
CA --> REJECT[拒绝不重入]
end
subgraph 事务
TX[Transaction] --> TX1[开启事务]
TX1 --> TX2[发送消息]
TX2 --> TX3[提交事务]
end确认方式对比
| 方式 | 可靠性 | 性能 | 复杂度 |
|---|---|---|---|
| 自动确认 | 低 | 高 | 低 |
| 手动确认 | 中 | 中 | 中 |
| 事务 | 高 | 低 | 高 |
| 发布确认 | 高 | 中 | 中 |
PHP 代码示例
自动确认模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'auto-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "自动确认模式消费者\n";
$callback = function (AMQPMessage $msg) {
echo "收到消息: " . $msg->getBody() . "\n";
// 消息投递后立即确认,无论处理是否成功
};
// 第三个参数 true 表示自动确认
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();手动确认模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'manual-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "手动确认模式消费者\n";
$callback = function (AMQPMessage $msg) {
try {
$data = json_decode($msg->getBody(), true);
echo "处理消息: " . json_encode($data) . "\n";
// 业务处理
processMessage($data);
// 确认消息
$msg->ack();
echo "消息已确认\n";
} catch (Exception $e) {
echo "处理失败: " . $e->getMessage() . "\n";
// 拒绝消息,重新入队
$msg->nack(true);
echo "消息已拒绝,重新入队\n";
}
};
// 设置 prefetch,一次只处理一条消息
$channel->basic_qos(null, 1, null);
// 第三个参数 false 表示手动确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processMessage($data)
{
// 模拟业务处理
if (rand(1, 10) === 1) {
throw new Exception("随机失败");
}
}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();批量确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'batch-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "批量确认模式消费者\n";
$batchSize = 10;
$processedCount = 0;
$deliveryTags = [];
$callback = function (AMQPMessage $msg) use (&$processedCount, &$deliveryTags, $batchSize, $channel) {
$deliveryTags[] = $msg->getDeliveryTag();
$processedCount++;
echo "处理消息 #{$processedCount}\n";
// 处理消息
processMessage(json_decode($msg->getBody(), true));
// 达到批量大小后批量确认
if ($processedCount >= $batchSize) {
// 批量确认(确认最后一个 tag,multiple=true 会确认之前的所有消息)
$channel->basic_ack($msg->getDeliveryTag(), true);
echo "批量确认 {$processedCount} 条消息\n";
$processedCount = 0;
$deliveryTags = [];
}
};
$channel->basic_qos(null, $batchSize, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processMessage($data)
{
// 业务处理
}
while ($channel->is_consuming()) {
$channel->wait();
}
// 处理剩余消息
if ($processedCount > 0) {
$channel->basic_ack(end($deliveryTags), true);
echo "确认剩余 {$processedCount} 条消息\n";
}
$channel->close();
$connection->close();消息拒绝
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'reject-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "消息拒绝示例\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
$retryCount = $msg->get('application_headers')
? $msg->get('application_headers')->getNativeData()['x-retry-count'] ?? 0
: 0;
try {
processMessage($data);
$msg->ack();
echo "消息处理成功\n";
} catch (RecoverableException $e) {
// 可恢复错误,重新入队
if ($retryCount < 3) {
$msg->nack(true); // requeue = true
echo "消息重新入队,重试次数: " . ($retryCount + 1) . "\n";
} else {
// 超过重试次数,发送到死信队列
$msg->nack(false); // requeue = false
echo "消息超过重试次数,已拒绝\n";
}
} catch (UnrecoverableException $e) {
// 不可恢复错误,直接拒绝
$msg->reject(false); // requeue = false
echo "消息无法处理,已拒绝\n";
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}
function processMessage($data)
{
// 业务处理
}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();确认管理类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class AckManager
{
private $channel;
private $pendingAcks = [];
private $batchSize;
public function __construct($channel, $batchSize = 10)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
}
public function ack(AMQPMessage $msg)
{
$deliveryTag = $msg->getDeliveryTag();
$this->pendingAcks[] = $deliveryTag;
if (count($this->pendingAcks) >= $this->batchSize) {
$this->flushAcks();
}
}
public function ackImmediately(AMQPMessage $msg)
{
$msg->ack();
}
public function nack(AMQPMessage $msg, $requeue = true)
{
$msg->nack($requeue);
}
public function reject(AMQPMessage $msg, $requeue = false)
{
$msg->reject($requeue);
}
public function flushAcks()
{
if (empty($this->pendingAcks)) {
return;
}
$lastTag = end($this->pendingAcks);
$this->channel->basic_ack($lastTag, true); // multiple = true
$count = count($this->pendingAcks);
$this->pendingAcks = [];
return $count;
}
public function getPendingCount()
{
return count($this->pendingAcks);
}
}实际应用场景
1. 可靠消息处理
php
<?php
class ReliableMessageConsumer
{
private $channel;
private $queueName;
private $maxRetries = 3;
public function __construct($channel, $queueName)
{
$this->channel = $channel;
$this->queueName = $queueName;
}
public function consume(callable $handler)
{
$this->channel->basic_qos(null, 1, null);
$callback = function ($msg) use ($handler) {
$headers = $this->getHeaders($msg);
$retryCount = $headers['x-retry-count'] ?? 0;
try {
$data = json_decode($msg->getBody(), true);
$handler($data);
$msg->ack();
} catch (Exception $e) {
$this->handleError($msg, $e, $retryCount);
}
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function getHeaders($msg)
{
if (!$msg->has('application_headers')) {
return [];
}
return $msg->get('application_headers')->getNativeData();
}
private function handleError($msg, $exception, $retryCount)
{
error_log("消息处理失败: " . $exception->getMessage());
if ($retryCount < $this->maxRetries) {
// 重新入队
$msg->nack(true);
} else {
// 超过重试次数,发送到死信队列
$msg->reject(false);
}
}
}2. 事务性消息处理
php
<?php
class TransactionalConsumer
{
private $channel;
private $db;
public function __construct($channel, $db)
{
$this->channel = $channel;
$this->db = $db;
}
public function consume($queueName, callable $handler)
{
$this->channel->basic_qos(null, 1, null);
$callback = function ($msg) use ($handler) {
$data = json_decode($msg->getBody(), true);
try {
// 开始数据库事务
$this->db->beginTransaction();
// 执行业务逻辑
$handler($data, $this->db);
// 提交数据库事务
$this->db->commit();
// 确认消息
$msg->ack();
echo "消息处理成功\n";
} catch (Exception $e) {
// 回滚数据库事务
$this->db->rollBack();
// 拒绝消息
$msg->nack(true);
echo "消息处理失败: " . $e->getMessage() . "\n";
}
};
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}3. 优先级确认
php
<?php
class PriorityAckConsumer
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function consume($queueName)
{
$this->channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$priority = $msg->get('priority') ?? 0;
$data = json_decode($msg->getBody(), true);
echo sprintf(
"处理消息 [优先级: %d]: %s\n",
$priority,
json_encode($data)
);
// 根据优先级决定处理方式
if ($priority >= 8) {
// 高优先级消息立即处理并确认
$this->processHighPriority($data);
$msg->ack();
} else {
// 普通消息批量确认
$this->processNormal($data);
$msg->ack();
}
};
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function processHighPriority($data)
{
// 高优先级处理逻辑
}
private function processNormal($data)
{
// 普通处理逻辑
}
}常见问题与解决方案
问题 1: 消息重复消费
症状: 消息被多次处理
原因: 消费者处理完成后未确认,或确认失败
解决方案:
php
<?php
// 实现幂等性
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$messageId = $data['message_id'];
// 使用 Redis 检查是否已处理
if ($redis->exists("processed:{$messageId}")) {
$msg->ack(); // 已处理,直接确认
return;
}
// 处理消息
processMessage($data);
// 标记为已处理
$redis->setex("processed:{$messageId}", 86400, '1');
$msg->ack();
};问题 2: 消息丢失
症状: 消息未被处理就消失了
原因: 使用了自动确认模式
解决方案:
php
<?php
// 使用手动确认模式
$channel->basic_consume(
$queueName,
'',
false,
false, // no_ack = false,手动确认
false,
false,
$callback
);
// 处理完成后再确认
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack(); // 处理成功后确认
} catch (Exception $e) {
$msg->nack(true); // 处理失败,重新入队
}
};问题 3: 消息积压
症状: 队列中消息越来越多
原因: 消费者处理速度慢,或 prefetch 设置过大
解决方案:
php
<?php
// 设置合理的 prefetch
$channel->basic_qos(null, 1, null); // 每次只取一条
// 增加消费者数量
// 或优化处理逻辑最佳实践建议
- 使用手动确认: 避免自动确认导致消息丢失
- 设置 prefetch: 合理设置预取数量
- 实现幂等性: 处理重复消息
- 异常处理: 区分可恢复和不可恢复错误
- 监控确认状态: 监控未确认消息数量
