Skip to content

批量确认

概述

批量确认机制允许生产者一次性确认多条消息,显著提高消息发送的吞吐量和性能。这是通过在发布确认模式的基础上实现的优化方案。

核心原理

批量确认流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant B as Broker
    
    P->>B: confirm_select()
    
    loop 批量发送
        P->>B: 发送消息 1
        P->>B: 发送消息 2
        P->>B: 发送消息 N
    end
    
    B-->>P: ACK(1..N)
    
    Note over P: 批量确认完成

批量 vs 单条确认

方式网络开销吞吐量复杂度
单条确认
批量确认

批量大小影响

mermaid
graph LR
    subgraph 批量大小影响
        S1[小批量<br/>100条] --> T1[高可靠性]
        S2[中等批量<br/>500条] --> T2[平衡]
        S3[大批量<br/>1000条] --> T3[高吞吐]
    end

PHP 代码示例

基础批量确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'batch-confirm-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 启用发布确认模式
$channel->confirm_select();

$batchSize = 100;
$messageCount = 1000;

echo "开始批量发送 {$messageCount} 条消息...\n";

$startTime = microtime(true);

for ($i = 1; $i <= $messageCount; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i, 'data' => 'message ' . $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    
    // 达到批次大小时等待确认
    if ($i % $batchSize === 0) {
        try {
            $channel->wait_for_pending_acks();
            $processed = $i / $batchSize;
            $rate = round($i / (microtime(true) - $startTime));
            echo "批次 {$processed} 完成,速率: {$rate} 消息/秒\n";
        } catch (Exception $e) {
            echo "批次确认失败: " . $e->getMessage() . "\n";
        }
    }
}

// 确认剩余消息
if ($messageCount % $batchSize !== 0) {
    $channel->wait_for_pending_acks();
}

$elapsed = round(microtime(true) - $startTime, 2);
$rate = round($messageCount / $elapsed);

echo "\n完成统计:\n";
echo "  总消息数: {$messageCount}\n";
echo "  总耗时: {$elapsed} 秒\n";
echo "  平均速率: {$rate} 消息/秒\n";

$channel->close();
$connection->close();

异步批量确认

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'async-batch-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->confirm_select();

// 统计
$sentCount = 0;
$confirmedCount = 0;
$nackedCount = 0;
$pendingTags = [];

// 设置确认回调
$channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmedCount, &$pendingTags) {
    if ($multiple) {
        // 批量确认
        foreach ($pendingTags as $tag => $count) {
            if ($tag <= $deliveryTag) {
                $confirmedCount += $count;
                unset($pendingTags[$tag]);
            }
        }
    } else {
        $confirmedCount++;
        unset($pendingTags[$deliveryTag]);
    }
});

$channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nackedCount, &$pendingTags) {
    if ($multiple) {
        foreach ($pendingTags as $tag => $count) {
            if ($tag <= $deliveryTag) {
                $nackedCount += $count;
                unset($pendingTags[$tag]);
            }
        }
    } else {
        $nackedCount++;
        unset($pendingTags[$deliveryTag]);
    }
});

// 发送消息
$batchSize = 500;
$messageCount = 2000;

for ($i = 1; $i <= $messageCount; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    $sentCount++;
    
    // 获取待确认的 delivery tag
    $deliveryTag = $channel->getNextDeliveryTag();
    $pendingTags[$deliveryTag] = 1;
    
    // 定期处理确认
    if ($sentCount % $batchSize === 0) {
        $channel->wait_for_pending_acks_returns(5);
        
        echo sprintf(
            "已发送: %d, 已确认: %d, 已拒绝: %d\n",
            $sentCount,
            $confirmedCount,
            $nackedCount
        );
    }
}

// 等待剩余确认
$channel->wait_for_pending_acks_returns(30);

echo "\n最终统计:\n";
echo "  已发送: {$sentCount}\n";
echo "  已确认: {$confirmedCount}\n";
echo "  已拒绝: {$nackedCount}\n";

$channel->close();
$connection->close();

批量确认管理类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class BatchConfirmPublisher
{
    private $channel;
    private $batchSize;
    private $sentCount = 0;
    private $pendingMessages = [];
    private $confirmedCount = 0;
    private $nackedCount = 0;
    
    public function __construct($channel, $batchSize = 100)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        
        $this->channel->confirm_select();
        $this->setupCallbacks();
    }
    
    private function setupCallbacks()
    {
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) {
            $this->handleAck($deliveryTag, $multiple);
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) {
            $this->handleNack($deliveryTag, $multiple, $requeue);
        });
    }
    
    public function publish($exchange, $routingKey, $data, $messageId = null)
    {
        $messageId = $messageId ?? uniqid('msg-');
        
        $message = new AMQPMessage(
            json_encode(array_merge($data, ['message_id' => $messageId])),
            [
                'content_type' => 'application/json',
                'message_id' => $messageId,
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        $this->sentCount++;
        $deliveryTag = $this->channel->getNextDeliveryTag();
        $this->pendingMessages[$deliveryTag] = $messageId;
        
        // 达到批次大小时等待确认
        if ($this->sentCount % $this->batchSize === 0) {
            $this->waitForConfirms();
        }
        
        return $messageId;
    }
    
    public function publishBatch($exchange, $routingKey, array $messages)
    {
        $ids = [];
        
        foreach ($messages as $data) {
            $ids[] = $this->publish($exchange, $routingKey, $data);
        }
        
        return $ids;
    }
    
    public function waitForConfirms($timeout = 30)
    {
        $this->channel->wait_for_pending_acks_returns($timeout);
    }
    
    private function handleAck($deliveryTag, $multiple)
    {
        if ($multiple) {
            $count = 0;
            foreach ($this->pendingMessages as $tag => $id) {
                if ($tag <= $deliveryTag) {
                    $count++;
                    unset($this->pendingMessages[$tag]);
                }
            }
            $this->confirmedCount += $count;
        } else {
            if (isset($this->pendingMessages[$deliveryTag])) {
                unset($this->pendingMessages[$deliveryTag]);
                $this->confirmedCount++;
            }
        }
    }
    
    private function handleNack($deliveryTag, $multiple, $requeue)
    {
        if ($multiple) {
            $count = 0;
            foreach ($this->pendingMessages as $tag => $id) {
                if ($tag <= $deliveryTag) {
                    $count++;
                    unset($this->pendingMessages[$tag]);
                }
            }
            $this->nackedCount += $count;
        } else {
            if (isset($this->pendingMessages[$deliveryTag])) {
                unset($this->pendingMessages[$deliveryTag]);
                $this->nackedCount++;
            }
        }
    }
    
    public function getStats()
    {
        return [
            'sent' => $this->sentCount,
            'confirmed' => $this->confirmedCount,
            'nacked' => $this->nackedCount,
            'pending' => count($this->pendingMessages)
        ];
    }
    
    public function flush()
    {
        $this->waitForConfirms();
        
        return $this->getStats();
    }
}

实际应用场景

1. 批量订单处理

php
<?php

class BatchOrderPublisher
{
    private $batchPublisher;
    private $exchange = 'orders';
    
    public function __construct($channel)
    {
        $this->batchPublisher = new BatchConfirmPublisher($channel, 100);
    }
    
    public function submitOrders(array $orders)
    {
        $orderIds = [];
        
        foreach ($orders as $order) {
            $orderId = $this->batchPublisher->publish(
                $this->exchange,
                'order.created',
                [
                    'order_id' => $order['id'],
                    'customer_id' => $order['customer_id'],
                    'items' => $order['items'],
                    'total' => $order['total'],
                    'created_at' => time()
                ],
                'order-' . $order['id']
            );
            
            $orderIds[] = $orderId;
        }
        
        $stats = $this->batchPublisher->flush();
        
        echo sprintf(
            "批量提交订单: 成功 %d, 失败 %d\n",
            $stats['confirmed'],
            $stats['nacked']
        );
        
        return $orderIds;
    }
}

2. 批量数据同步

php
<?php

class BatchSyncPublisher
{
    private $batchPublisher;
    private $exchange = 'sync';
    
    public function __construct($channel)
    {
        $this->batchPublisher = new BatchConfirmPublisher($channel, 500);
    }
    
    public function syncRecords($entityType, array $records)
    {
        $syncIds = [];
        
        foreach ($records as $record) {
            $syncId = $this->batchPublisher->publish(
                $this->exchange,
                "{$entityType}.synced",
                [
                    'entity_type' => $entityType,
                    'entity_id' => $record['id'],
                    'data' => $record,
                    'synced_at' => time()
                ],
                "sync-{$entityType}-{$record['id']}"
            );
            
            $syncIds[] = $syncId;
        }
        
        $stats = $this->batchPublisher->flush();
        
        echo "批量同步 {$entityType}: 成功 {$stats['confirmed']}, 失败 {$stats['nacked']}\n";
        
        return $syncIds;
    }
}

3. 日志批量发送

php
<?php

class BatchLogPublisher
{
    private $batchPublisher;
    private $exchange = 'logs';
    private $buffer = [];
    private $bufferSize = 100;
    
    public function __construct($channel)
    {
        $this->batchPublisher = new BatchConfirmPublisher($channel, 1000);
    }
    
    public function log($level, $service, $message, $context = [])
    {
        $this->buffer[] = [
            'level' => $level,
            'service' => $service,
            'message' => $message,
            'context' => $context,
            'timestamp' => time(),
            'hostname' => gethostname()
        ];
        
        if (count($this->buffer) >= $this->bufferSize) {
            $this->flush();
        }
    }
    
    public function flush()
    {
        if (empty($this->buffer)) {
            return;
        }
        
        foreach ($this->buffer as $log) {
            $this->batchPublisher->publish(
                $this->exchange,
                "{$log['service']}.{$log['level']}",
                $log
            );
        }
        
        $stats = $this->batchPublisher->flush();
        
        echo "批量日志: 发送 " . count($this->buffer) . ", 确认 {$stats['confirmed']}\n";
        
        $this->buffer = [];
    }
    
    public function __destruct()
    {
        $this->flush();
    }
}

常见问题与解决方案

问题 1: 批量大小选择

症状: 不知道如何设置批量大小

解决方案:

php
<?php

// 根据场景选择批量大小
$batchSizes = [
    'high_reliability' => 50,    // 高可靠性场景
    'balanced' => 200,           // 平衡场景
    'high_throughput' => 1000,   // 高吞吐场景
];

// 可靠性优先使用小批量
$publisher = new BatchConfirmPublisher($channel, 50);

问题 2: 确认超时

症状: 批量确认等待时间过长

解决方案:

php
<?php

// 设置合理的超时时间
try {
    $channel->wait_for_pending_acks_returns(30);  // 30秒超时
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
    echo "确认超时\n";
    // 处理超时,可以重试或记录
}

问题 3: 部分失败处理

症状: 批量中部分消息失败

解决方案:

php
<?php

// 获取失败消息并重试
$stats = $publisher->getStats();

if ($stats['nacked'] > 0) {
    // 记录失败的消息
    error_log("{$stats['nacked']} 条消息发送失败");
    
    // 可以实现重试逻辑
    // foreach ($failedMessages as $msg) { ... }
}

最佳实践建议

  1. 合理设置批量大小: 根据可靠性要求选择
  2. 监控确认状态: 跟踪确认和拒绝数量
  3. 处理超时: 设置合理的超时时间
  4. 异常处理: 处理部分失败的情况
  5. 性能测试: 测试不同批量大小的性能

相关链接