Appearance
批量确认
概述
批量确认机制允许生产者一次性确认多条消息,显著提高消息发送的吞吐量和性能。这是通过在发布确认模式的基础上实现的优化方案。
核心原理
批量确认流程
mermaid
sequenceDiagram
participant P as 生产者
participant B as Broker
P->>B: confirm_select()
loop 批量发送
P->>B: 发送消息 1
P->>B: 发送消息 2
P->>B: 发送消息 N
end
B-->>P: ACK(1..N)
Note over P: 批量确认完成批量 vs 单条确认
| 方式 | 网络开销 | 吞吐量 | 复杂度 |
|---|---|---|---|
| 单条确认 | 高 | 低 | 低 |
| 批量确认 | 低 | 高 | 中 |
批量大小影响
mermaid
graph LR
subgraph 批量大小影响
S1[小批量<br/>100条] --> T1[高可靠性]
S2[中等批量<br/>500条] --> T2[平衡]
S3[大批量<br/>1000条] --> T3[高吞吐]
endPHP 代码示例
基础批量确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'batch-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 启用发布确认模式
$channel->confirm_select();
$batchSize = 100;
$messageCount = 1000;
echo "开始批量发送 {$messageCount} 条消息...\n";
$startTime = microtime(true);
for ($i = 1; $i <= $messageCount; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i, 'data' => 'message ' . $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
// 达到批次大小时等待确认
if ($i % $batchSize === 0) {
try {
$channel->wait_for_pending_acks();
$processed = $i / $batchSize;
$rate = round($i / (microtime(true) - $startTime));
echo "批次 {$processed} 完成,速率: {$rate} 消息/秒\n";
} catch (Exception $e) {
echo "批次确认失败: " . $e->getMessage() . "\n";
}
}
}
// 确认剩余消息
if ($messageCount % $batchSize !== 0) {
$channel->wait_for_pending_acks();
}
$elapsed = round(microtime(true) - $startTime, 2);
$rate = round($messageCount / $elapsed);
echo "\n完成统计:\n";
echo " 总消息数: {$messageCount}\n";
echo " 总耗时: {$elapsed} 秒\n";
echo " 平均速率: {$rate} 消息/秒\n";
$channel->close();
$connection->close();异步批量确认
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'async-batch-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();
// 统计
$sentCount = 0;
$confirmedCount = 0;
$nackedCount = 0;
$pendingTags = [];
// 设置确认回调
$channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmedCount, &$pendingTags) {
if ($multiple) {
// 批量确认
foreach ($pendingTags as $tag => $count) {
if ($tag <= $deliveryTag) {
$confirmedCount += $count;
unset($pendingTags[$tag]);
}
}
} else {
$confirmedCount++;
unset($pendingTags[$deliveryTag]);
}
});
$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nackedCount, &$pendingTags) {
if ($multiple) {
foreach ($pendingTags as $tag => $count) {
if ($tag <= $deliveryTag) {
$nackedCount += $count;
unset($pendingTags[$tag]);
}
}
} else {
$nackedCount++;
unset($pendingTags[$deliveryTag]);
}
});
// 发送消息
$batchSize = 500;
$messageCount = 2000;
for ($i = 1; $i <= $messageCount; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
$sentCount++;
// 获取待确认的 delivery tag
$deliveryTag = $channel->getNextDeliveryTag();
$pendingTags[$deliveryTag] = 1;
// 定期处理确认
if ($sentCount % $batchSize === 0) {
$channel->wait_for_pending_acks_returns(5);
echo sprintf(
"已发送: %d, 已确认: %d, 已拒绝: %d\n",
$sentCount,
$confirmedCount,
$nackedCount
);
}
}
// 等待剩余确认
$channel->wait_for_pending_acks_returns(30);
echo "\n最终统计:\n";
echo " 已发送: {$sentCount}\n";
echo " 已确认: {$confirmedCount}\n";
echo " 已拒绝: {$nackedCount}\n";
$channel->close();
$connection->close();批量确认管理类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class BatchConfirmPublisher
{
private $channel;
private $batchSize;
private $sentCount = 0;
private $pendingMessages = [];
private $confirmedCount = 0;
private $nackedCount = 0;
public function __construct($channel, $batchSize = 100)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->channel->confirm_select();
$this->setupCallbacks();
}
private function setupCallbacks()
{
$this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
$this->handleAck($deliveryTag, $multiple);
});
$this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
$this->handleNack($deliveryTag, $multiple, $requeue);
});
}
public function publish($exchange, $routingKey, $data, $messageId = null)
{
$messageId = $messageId ?? uniqid('msg-');
$message = new AMQPMessage(
json_encode(array_merge($data, ['message_id' => $messageId])),
[
'content_type' => 'application/json',
'message_id' => $messageId,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, $exchange, $routingKey);
$this->sentCount++;
$deliveryTag = $this->channel->getNextDeliveryTag();
$this->pendingMessages[$deliveryTag] = $messageId;
// 达到批次大小时等待确认
if ($this->sentCount % $this->batchSize === 0) {
$this->waitForConfirms();
}
return $messageId;
}
public function publishBatch($exchange, $routingKey, array $messages)
{
$ids = [];
foreach ($messages as $data) {
$ids[] = $this->publish($exchange, $routingKey, $data);
}
return $ids;
}
public function waitForConfirms($timeout = 30)
{
$this->channel->wait_for_pending_acks_returns($timeout);
}
private function handleAck($deliveryTag, $multiple)
{
if ($multiple) {
$count = 0;
foreach ($this->pendingMessages as $tag => $id) {
if ($tag <= $deliveryTag) {
$count++;
unset($this->pendingMessages[$tag]);
}
}
$this->confirmedCount += $count;
} else {
if (isset($this->pendingMessages[$deliveryTag])) {
unset($this->pendingMessages[$deliveryTag]);
$this->confirmedCount++;
}
}
}
private function handleNack($deliveryTag, $multiple, $requeue)
{
if ($multiple) {
$count = 0;
foreach ($this->pendingMessages as $tag => $id) {
if ($tag <= $deliveryTag) {
$count++;
unset($this->pendingMessages[$tag]);
}
}
$this->nackedCount += $count;
} else {
if (isset($this->pendingMessages[$deliveryTag])) {
unset($this->pendingMessages[$deliveryTag]);
$this->nackedCount++;
}
}
}
public function getStats()
{
return [
'sent' => $this->sentCount,
'confirmed' => $this->confirmedCount,
'nacked' => $this->nackedCount,
'pending' => count($this->pendingMessages)
];
}
public function flush()
{
$this->waitForConfirms();
return $this->getStats();
}
}实际应用场景
1. 批量订单处理
php
<?php
class BatchOrderPublisher
{
private $batchPublisher;
private $exchange = 'orders';
public function __construct($channel)
{
$this->batchPublisher = new BatchConfirmPublisher($channel, 100);
}
public function submitOrders(array $orders)
{
$orderIds = [];
foreach ($orders as $order) {
$orderId = $this->batchPublisher->publish(
$this->exchange,
'order.created',
[
'order_id' => $order['id'],
'customer_id' => $order['customer_id'],
'items' => $order['items'],
'total' => $order['total'],
'created_at' => time()
],
'order-' . $order['id']
);
$orderIds[] = $orderId;
}
$stats = $this->batchPublisher->flush();
echo sprintf(
"批量提交订单: 成功 %d, 失败 %d\n",
$stats['confirmed'],
$stats['nacked']
);
return $orderIds;
}
}2. 批量数据同步
php
<?php
class BatchSyncPublisher
{
private $batchPublisher;
private $exchange = 'sync';
public function __construct($channel)
{
$this->batchPublisher = new BatchConfirmPublisher($channel, 500);
}
public function syncRecords($entityType, array $records)
{
$syncIds = [];
foreach ($records as $record) {
$syncId = $this->batchPublisher->publish(
$this->exchange,
"{$entityType}.synced",
[
'entity_type' => $entityType,
'entity_id' => $record['id'],
'data' => $record,
'synced_at' => time()
],
"sync-{$entityType}-{$record['id']}"
);
$syncIds[] = $syncId;
}
$stats = $this->batchPublisher->flush();
echo "批量同步 {$entityType}: 成功 {$stats['confirmed']}, 失败 {$stats['nacked']}\n";
return $syncIds;
}
}3. 日志批量发送
php
<?php
class BatchLogPublisher
{
private $batchPublisher;
private $exchange = 'logs';
private $buffer = [];
private $bufferSize = 100;
public function __construct($channel)
{
$this->batchPublisher = new BatchConfirmPublisher($channel, 1000);
}
public function log($level, $service, $message, $context = [])
{
$this->buffer[] = [
'level' => $level,
'service' => $service,
'message' => $message,
'context' => $context,
'timestamp' => time(),
'hostname' => gethostname()
];
if (count($this->buffer) >= $this->bufferSize) {
$this->flush();
}
}
public function flush()
{
if (empty($this->buffer)) {
return;
}
foreach ($this->buffer as $log) {
$this->batchPublisher->publish(
$this->exchange,
"{$log['service']}.{$log['level']}",
$log
);
}
$stats = $this->batchPublisher->flush();
echo "批量日志: 发送 " . count($this->buffer) . ", 确认 {$stats['confirmed']}\n";
$this->buffer = [];
}
public function __destruct()
{
$this->flush();
}
}常见问题与解决方案
问题 1: 批量大小选择
症状: 不知道如何设置批量大小
解决方案:
php
<?php
// 根据场景选择批量大小
$batchSizes = [
'high_reliability' => 50, // 高可靠性场景
'balanced' => 200, // 平衡场景
'high_throughput' => 1000, // 高吞吐场景
];
// 可靠性优先使用小批量
$publisher = new BatchConfirmPublisher($channel, 50);问题 2: 确认超时
症状: 批量确认等待时间过长
解决方案:
php
<?php
// 设置合理的超时时间
try {
$channel->wait_for_pending_acks_returns(30); // 30秒超时
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
echo "确认超时\n";
// 处理超时,可以重试或记录
}问题 3: 部分失败处理
症状: 批量中部分消息失败
解决方案:
php
<?php
// 获取失败消息并重试
$stats = $publisher->getStats();
if ($stats['nacked'] > 0) {
// 记录失败的消息
error_log("{$stats['nacked']} 条消息发送失败");
// 可以实现重试逻辑
// foreach ($failedMessages as $msg) { ... }
}最佳实践建议
- 合理设置批量大小: 根据可靠性要求选择
- 监控确认状态: 跟踪确认和拒绝数量
- 处理超时: 设置合理的超时时间
- 异常处理: 处理部分失败的情况
- 性能测试: 测试不同批量大小的性能
