Appearance
消费者确认
概述
消费者确认(Consumer Acknowledgements)是 RabbitMQ 保证消息可靠消费的机制。消费者在处理完消息后发送确认,Broker 才会删除消息,否则消息会被重新投递。
核心原理
确认流程
mermaid
sequenceDiagram
participant B as Broker
participant C as 消费者
B->>C: 投递消息
Note over C: 消息状态: unacked
alt 处理成功
C->>B: ACK
B->>B: 删除消息
else 处理失败-可重试
C->>B: NACK(requeue=true)
B->>B: 重新入队
else 处理失败-不可重试
C->>B: REJECT(requeue=false)
B->>B: 丢弃/死信
end确认方式
| 方式 | 方法 | 说明 |
|---|---|---|
| 自动确认 | no_ack=true | 消息投递后立即确认 |
| 手动确认 | basic_ack | 确认单条或批量消息 |
| 拒绝重入 | basic_nack | 拒绝并可选择重新入队 |
| 拒绝丢弃 | basic_reject | 拒绝单条消息 |
消息状态
mermaid
stateDiagram-v2
[*] --> Ready: 消息入队
Ready --> Unacked: 投递给消费者
Unacked --> Ready: NACK(requeue=true)
Unacked --> Acked: ACK
Unacked --> Dead: REJECT/NACK(requeue=false)
Acked --> [*]: 删除
Dead --> [*]: 死信/丢弃PHP 代码示例
自动确认模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'auto-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "自动确认模式消费者\n";
echo "警告: 消息投递后立即确认,可能丢失消息\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到消息: " . json_encode($data) . "\n";
// 处理消息(即使处理失败,消息也已确认)
processMessage($data);
};
// no_ack = true,自动确认
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();手动确认模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'manual-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "手动确认模式消费者\n";
// 设置 prefetch,一次只处理一条消息
$channel->basic_qos(null, 1, null);
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到消息: " . json_encode($data) . "\n";
try {
// 处理消息
$result = processMessage($data);
if ($result) {
// 处理成功,确认消息
$msg->ack();
echo "消息已确认\n";
} else {
// 处理失败,拒绝并重新入队
$msg->nack(true);
echo "消息已拒绝,重新入队\n";
}
} catch (Exception $e) {
echo "处理异常: " . $e->getMessage() . "\n";
// 根据异常类型决定处理方式
if ($e instanceof RecoverableException) {
// 可恢复异常,重新入队
$msg->nack(true);
} else {
// 不可恢复异常,丢弃消息
$msg->reject(false);
}
}
};
// no_ack = false,手动确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processMessage($data)
{
// 业务处理逻辑
return true;
}
class RecoverableException extends Exception {}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();批量确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'batch-ack-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "批量确认模式消费者\n";
$batchSize = 10;
$processedCount = 0;
$lastDeliveryTag = 0;
$callback = function (AMQPMessage $msg) use (&$processedCount, &$lastDeliveryTag, $batchSize, $channel) {
$data = json_decode($msg->getBody(), true);
$deliveryTag = $msg->getDeliveryTag();
echo "处理消息 #{$deliveryTag}\n";
// 处理消息
processMessage($data);
$processedCount++;
$lastDeliveryTag = $deliveryTag;
// 达到批次大小时批量确认
if ($processedCount >= $batchSize) {
// multiple = true,确认到当前 tag 的所有消息
$channel->basic_ack($lastDeliveryTag, true);
echo "批量确认 {$processedCount} 条消息\n";
$processedCount = 0;
$lastDeliveryTag = 0;
}
};
$channel->basic_qos(null, $batchSize, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processMessage($data)
{
// 业务处理
}
while ($channel->is_consuming()) {
$channel->wait();
}
// 确认剩余消息
if ($processedCount > 0) {
$channel->basic_ack($lastDeliveryTag, true);
echo "确认剩余 {$processedCount} 条消息\n";
}
$channel->close();
$connection->close();消费者确认管理类
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class ConsumerAckManager
{
private $channel;
private $unackedMessages = [];
private $batchSize;
private $autoFlush;
public function __construct($channel, $batchSize = 1, $autoFlush = true)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->autoFlush = $autoFlush;
}
public function track(AMQPMessage $msg)
{
$deliveryTag = $msg->getDeliveryTag();
$this->unackedMessages[$deliveryTag] = [
'msg' => $msg,
'received_at' => time()
];
}
public function ack(AMQPMessage $msg, $multiple = false)
{
$deliveryTag = $msg->getDeliveryTag();
if ($multiple) {
// 批量确认
$this->channel->basic_ack($deliveryTag, true);
foreach ($this->unackedMessages as $tag => $info) {
if ($tag <= $deliveryTag) {
unset($this->unackedMessages[$tag]);
}
}
} else {
// 单条确认
$msg->ack();
unset($this->unackedMessages[$deliveryTag]);
}
}
public function nack(AMQPMessage $msg, $requeue = true)
{
$msg->nack($requeue);
unset($this->unackedMessages[$msg->getDeliveryTag()]);
}
public function reject(AMQPMessage $msg, $requeue = false)
{
$msg->reject($requeue);
unset($this->unackedMessages[$msg->getDeliveryTag()]);
}
public function flush()
{
if (empty($this->unackedMessages)) {
return;
}
$lastTag = max(array_keys($this->unackedMessages));
$this->channel->basic_ack($lastTag, true);
$this->unackedMessages = [];
}
public function getUnackedCount()
{
return count($this->unackedMessages);
}
public function getUnackedMessages()
{
return $this->unackedMessages;
}
}实际应用场景
1. 可靠消息处理
php
<?php
class ReliableConsumer
{
private $channel;
private $queueName;
private $ackManager;
private $maxRetries = 3;
public function __construct($channel, $queueName)
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->ackManager = new ConsumerAckManager($channel);
}
public function consume(callable $handler)
{
$this->channel->basic_qos(null, 1, null);
$callback = function ($msg) use ($handler) {
$this->ackManager->track($msg);
$data = json_decode($msg->getBody(), true);
$retryCount = $this->getRetryCount($msg);
try {
$handler($data);
$this->ackManager->ack($msg);
} catch (RecoverableException $e) {
if ($retryCount < $this->maxRetries) {
$this->ackManager->nack($msg, true);
echo "消息重新入队,重试 " . ($retryCount + 1) . "/{$this->maxRetries}\n";
} else {
$this->ackManager->reject($msg, false);
echo "消息超过最大重试次数,已拒绝\n";
}
} catch (UnrecoverableException $e) {
$this->ackManager->reject($msg, false);
echo "不可恢复错误,消息已拒绝\n";
}
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function getRetryCount($msg)
{
if (!$msg->has('application_headers')) {
return 0;
}
$headers = $msg->get('application_headers')->getNativeData();
return $headers['x-retry-count'] ?? 0;
}
}
class RecoverableException extends Exception {}
class UnrecoverableException extends Exception {}2. 事务性消费
php
<?php
class TransactionalConsumer
{
private $channel;
private $db;
public function __construct($channel, $db)
{
$this->channel = $channel;
$this->db = $db;
}
public function consume($queueName, callable $handler)
{
$this->channel->basic_qos(null, 1, null);
$callback = function ($msg) use ($handler) {
$data = json_decode($msg->getBody(), true);
try {
// 开始数据库事务
$this->db->beginTransaction();
// 执行业务逻辑
$handler($data, $this->db);
// 提交事务
$this->db->commit();
// 确认消息
$msg->ack();
echo "消息处理成功\n";
} catch (Exception $e) {
// 回滚事务
$this->db->rollBack();
// 拒绝消息
$msg->nack(true);
echo "消息处理失败: " . $e->getMessage() . "\n";
}
};
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}3. 多消费者负载均衡
php
<?php
class LoadBalancedConsumer
{
private $channel;
private $queueName;
private $consumerCount;
public function __construct($channel, $queueName, $consumerCount = 1)
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->consumerCount = $consumerCount;
}
public function consume(callable $handler)
{
// 设置公平分发
$this->channel->basic_qos(null, 1, null);
for ($i = 0; $i < $this->consumerCount; $i++) {
$consumerTag = 'consumer-' . $i;
$callback = function ($msg) use ($handler, $consumerTag) {
$data = json_decode($msg->getBody(), true);
echo "[{$consumerTag}] 处理消息\n";
try {
$handler($data);
$msg->ack();
} catch (Exception $e) {
$msg->nack(true);
}
};
$this->channel->basic_consume(
$this->queueName,
$consumerTag,
false,
false,
false,
false,
$callback
);
}
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}常见问题与解决方案
问题 1: 消息重复消费
症状: 同一消息被多次处理
解决方案:
php
<?php
// 实现幂等性
$callback = function ($msg) use ($redis) {
$data = json_decode($msg->getBody(), true);
$messageId = $data['message_id'];
// 检查是否已处理
if ($redis->exists("processed:{$messageId}")) {
$msg->ack(); // 已处理,直接确认
return;
}
// 处理消息
processMessage($data);
// 标记为已处理
$redis->setex("processed:{$messageId}", 86400, 1);
$msg->ack();
};问题 2: 消息积压
症状: 队列中消息越来越多
解决方案:
php
<?php
// 设置合理的 prefetch
$channel->basic_qos(null, 10, null); // 每次预取 10 条
// 增加消费者数量
// 或优化处理逻辑
// 监控队列深度
list(, $messageCount,) = $channel->queue_declare($queueName, true);
echo "队列消息数: {$messageCount}\n";问题 3: 消费者断开消息丢失
症状: 消费者断开后消息丢失
解决方案:
php
<?php
// 使用手动确认模式
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 处理完成后再确认
$callback = function ($msg) {
try {
processMessage($msg);
$msg->ack(); // 处理成功后确认
} catch (Exception $e) {
$msg->nack(true); // 处理失败,重新入队
}
};最佳实践建议
- 使用手动确认: 避免自动确认导致消息丢失
- 设置 prefetch: 合理设置预取数量,实现公平分发
- 实现幂等性: 处理重复消息
- 异常分类处理: 区分可恢复和不可恢复错误
- 监控确认状态: 监控未确认消息数量
