Appearance
生产者确认
概述
生产者确认(Publisher Confirms)是 RabbitMQ 提供的一种机制,用于确认生产者发送的消息已被 Broker 成功处理。这是保证消息可靠投递的重要手段。
核心原理
确认流程
mermaid
sequenceDiagram
participant P as 生产者
participant B as Broker
P->>B: confirm_select()
B-->>P: 确认进入确认模式
P->>B: 发送消息 1
P->>B: 发送消息 2
P->>B: 发送消息 3
B->>B: 处理消息
B-->>P: ACK (消息 1)
B-->>P: ACK (消息 2)
B-->>P: NACK (消息 3)
Note over P: 根据确认结果处理确认模式对比
| 模式 | 性能 | 可靠性 | 复杂度 |
|---|---|---|---|
| 自动确认 | 最高 | 最低 | 最低 |
| 事务 | 最低 | 最高 | 高 |
| 同步确认 | 中 | 高 | 中 |
| 异步确认 | 高 | 高 | 高 |
确认结果
- ACK: 消息已被成功处理
- NACK: 消息处理失败(如内部错误)
PHP 代码示例
启用发布确认模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$queueName = 'confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 启用发布确认模式
$channel->confirm_select();
echo "已进入发布确认模式\n";
// 发送消息
$message = new AMQPMessage(
json_encode(['data' => 'test']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
// 同步等待确认
try {
$channel->wait_for_pending_acks();
echo "消息已确认\n";
} catch (Exception $e) {
echo "消息确认失败: " . $e->getMessage() . "\n";
}
$channel->close();
$connection->close();同步确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'sync-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();
// 发送多条消息
$messageCount = 10;
for ($i = 1; $i <= $messageCount; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i, 'data' => 'message ' . $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
echo "消息 {$i} 已发送\n";
// 每条消息单独确认
try {
$channel->wait_for_pending_acks();
echo "消息 {$i} 已确认\n";
} catch (Exception $e) {
echo "消息 {$i} 确认失败: " . $e->getMessage() . "\n";
}
}
$channel->close();
$connection->close();批量确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'batch-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();
$batchSize = 100;
$totalMessages = 1000;
for ($i = 1; $i <= $totalMessages; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
// 每批发送后确认
if ($i % $batchSize === 0) {
try {
$channel->wait_for_pending_acks();
echo "批次 " . ($i / $batchSize) . " 已确认 ({$batchSize} 条消息)\n";
} catch (Exception $e) {
echo "批次确认失败: " . $e->getMessage() . "\n";
}
}
}
// 确认剩余消息
if ($totalMessages % $batchSize !== 0) {
$channel->wait_for_pending_acks();
echo "剩余消息已确认\n";
}
$channel->close();
$connection->close();异步确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'async-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();
// 统计确认结果
$confirmedCount = 0;
$nackedCount = 0;
$publishedCount = 0;
// 设置确认回调
$channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmedCount) {
$confirmedCount++;
if ($multiple) {
echo "批量确认到 tag: {$deliveryTag}\n";
} else {
echo "消息确认: tag {$deliveryTag}\n";
}
});
// 设置拒绝回调
$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nackedCount) {
$nackedCount++;
echo "消息拒绝: tag {$deliveryTag}, requeue: " . ($requeue ? 'true' : 'false') . "\n";
});
// 发送消息
$messageCount = 100;
for ($i = 1; $i <= $messageCount; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
$publishedCount++;
}
echo "已发送 {$publishedCount} 条消息\n";
// 等待所有确认
$channel->wait_for_pending_acks();
echo "确认统计:\n";
echo " 已确认: {$confirmedCount}\n";
echo " 已拒绝: {$nackedCount}\n";
$channel->close();
$connection->close();发布确认管理类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherConfirmManager
{
private $channel;
private $pendingMessages = [];
private $confirmedMessages = [];
private $failedMessages = [];
private $batchSize;
public function __construct($channel, $batchSize = 100)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->enableConfirmMode();
}
private function enableConfirmMode()
{
$this->channel->confirm_select();
$this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
$this->handleAck($deliveryTag, $multiple);
});
$this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
$this->handleNack($deliveryTag, $multiple, $requeue);
});
}
public function publish($exchange, $routingKey, $data, $messageId = null)
{
$messageId = $messageId ?? uniqid('msg-');
$message = new AMQPMessage(
json_encode(array_merge($data, ['message_id' => $messageId])),
[
'content_type' => 'application/json',
'message_id' => $messageId,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
$deliveryTag = $this->channel->getNextDeliveryTag();
$this->pendingMessages[$deliveryTag] = [
'message_id' => $messageId,
'data' => $data,
'published_at' => time()
];
// 达到批次大小时等待确认
if (count($this->pendingMessages) >= $this->batchSize) {
$this->waitForConfirms();
}
return $messageId;
}
public function waitForConfirms($timeout = 30)
{
$this->channel->wait_for_pending_acks_returns($timeout);
}
private function handleAck($deliveryTag, $multiple)
{
if ($multiple) {
// 批量确认
foreach ($this->pendingMessages as $tag => $info) {
if ($tag <= $deliveryTag) {
$this->confirmedMessages[$info['message_id']] = $info;
unset($this->pendingMessages[$tag]);
}
}
} else {
// 单条确认
if (isset($this->pendingMessages[$deliveryTag])) {
$info = $this->pendingMessages[$deliveryTag];
$this->confirmedMessages[$info['message_id']] = $info;
unset($this->pendingMessages[$deliveryTag]);
}
}
}
private function handleNack($deliveryTag, $multiple, $requeue)
{
if ($multiple) {
foreach ($this->pendingMessages as $tag => $info) {
if ($tag <= $deliveryTag) {
$this->failedMessages[$info['message_id']] = $info;
unset($this->pendingMessages[$tag]);
}
}
} else {
if (isset($this->pendingMessages[$deliveryTag])) {
$info = $this->pendingMessages[$deliveryTag];
$this->failedMessages[$info['message_id']] = $info;
unset($this->pendingMessages[$deliveryTag]);
}
}
}
public function getStats()
{
return [
'pending' => count($this->pendingMessages),
'confirmed' => count($this->confirmedMessages),
'failed' => count($this->failedMessages)
];
}
public function getFailedMessages()
{
return $this->failedMessages;
}
public function retryFailed()
{
foreach ($this->failedMessages as $messageId => $info) {
// 重新发送失败的消息
$this->publish('', '', $info['data'], $messageId);
}
$this->failedMessages = [];
}
}实际应用场景
1. 可靠消息发送
php
<?php
class ReliablePublisher
{
private $confirmManager;
private $maxRetries = 3;
public function __construct($channel)
{
$this->confirmManager = new PublisherConfirmManager($channel);
}
public function sendWithRetry($exchange, $routingKey, $data)
{
$attempts = 0;
while ($attempts < $this->maxRetries) {
$attempts++;
$messageId = $this->confirmManager->publish($exchange, $routingKey, $data);
$this->confirmManager->waitForConfirms();
$stats = $this->confirmManager->getStats();
if ($stats['failed'] === 0) {
echo "消息发送成功: {$messageId}\n";
return $messageId;
}
echo "发送失败,重试 {$attempts}/{$this->maxRetries}\n";
$this->confirmManager->retryFailed();
}
throw new RuntimeException("消息发送失败,已重试 {$this->maxRetries} 次");
}
}2. 批量消息发送
php
<?php
class BatchPublisher
{
private $channel;
private $batchSize;
private $batch = [];
public function __construct($channel, $batchSize = 100)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->channel->confirm_select();
}
public function add($exchange, $routingKey, $data)
{
$this->batch[] = [
'exchange' => $exchange,
'routing_key' => $routingKey,
'data' => $data
];
if (count($this->batch) >= $this->batchSize) {
return $this->flush();
}
return 0;
}
public function flush()
{
if (empty($this->batch)) {
return 0;
}
foreach ($this->batch as $item) {
$message = new AMQPMessage(
json_encode($item['data']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish(
$message,
$item['exchange'],
$item['routing_key']
);
}
$count = count($this->batch);
$this->batch = [];
$this->channel->wait_for_pending_acks();
echo "批量发送 {$count} 条消息已确认\n";
return $count;
}
}3. 事务性消息发送
php
<?php
class TransactionalPublisher
{
private $channel;
private $db;
public function __construct($channel, $db)
{
$this->channel = $channel;
$this->db = $db;
$this->channel->confirm_select();
}
public function sendInTransaction($queueName, $data)
{
try {
// 开始数据库事务
$this->db->beginTransaction();
// 记录消息到数据库
$messageId = $this->recordMessage($data);
// 发送消息
$message = new AMQPMessage(
json_encode(array_merge($data, ['message_id' => $messageId])),
[
'message_id' => $messageId,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, '', $queueName);
// 等待确认
$this->channel->wait_for_pending_acks();
// 提交数据库事务
$this->db->commit();
echo "消息发送成功: {$messageId}\n";
return $messageId;
} catch (Exception $e) {
// 回滚数据库事务
$this->db->rollBack();
echo "消息发送失败: " . $e->getMessage() . "\n";
throw $e;
}
}
private function recordMessage($data)
{
$messageId = uniqid('msg-');
$sql = "INSERT INTO message_log (message_id, data, status, created_at)
VALUES (?, ?, 'pending', NOW())";
$stmt = $this->db->prepare($sql);
$stmt->execute([$messageId, json_encode($data)]);
return $messageId;
}
}常见问题与解决方案
问题 1: 确认超时
症状: wait_for_pending_acks 超时
解决方案:
php
<?php
// 设置合理的超时时间
try {
$channel->wait_for_pending_acks(30); // 30秒超时
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
echo "确认超时,可能需要重试\n";
// 处理超时逻辑
}问题 2: 内存占用过高
症状: 大量消息未确认导致内存增长
解决方案:
php
<?php
// 定期等待确认,不要积累太多未确认消息
$batchSize = 100;
$sentCount = 0;
foreach ($messages as $data) {
$channel->basic_publish($message, '', $queueName);
$sentCount++;
if ($sentCount % $batchSize === 0) {
$channel->wait_for_pending_acks();
}
}
// 确认剩余消息
if ($sentCount % $batchSize !== 0) {
$channel->wait_for_pending_acks();
}问题 3: NACK 处理
症状: 收到 NACK 但不知道如何处理
解决方案:
php
<?php
$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
// 记录失败消息
error_log("消息被拒绝: delivery_tag={$deliveryTag}");
// 根据业务需求决定是否重试
// 注意:NACK 通常表示 Broker 内部错误,重试可能无效
});
// 对于关键消息,建议使用数据库记录并重试最佳实践建议
- 启用确认模式: 生产环境务必启用发布确认
- 批量确认: 平衡性能和可靠性
- 异步确认: 高吞吐场景使用异步确认
- 处理 NACK: 记录并告警 NACK 消息
- 合理超时: 设置合理的确认超时时间
