Appearance
RabbitMQ 批量处理优化
概述
批量处理是提升 RabbitMQ 吞吐量的有效手段。通过合理使用批量发布、批量确认和批量消费,可以显著减少网络往返次数和协议开销,从而大幅提升系统性能。
核心知识点
批量处理的优势
┌─────────────────────────────────────────────────────────────┐
│ 单条 vs 批量处理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 单条发送: │
│ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ │ M │──▶│ M │──▶│ M │──▶│ M │──▶│ M │ 5次网络往返 │
│ └───┘ └───┘ └───┘ └───┘ └───┘ │
│ │
│ 批量发送: │
│ ┌───┬───┬───┬───┬───┐ │
│ │ M │ M │ M │ M │ M │──────────────────▶ 1次网络往返 │
│ └───┴───┴───┴───┴───┘ │
│ │
│ 性能提升: │
│ - 网络往返减少 80%+ │
│ - 协议开销减少 80%+ │
│ - 吞吐量提升 2-5 倍 │
└─────────────────────────────────────────────────────────────┘批量处理类型
| 类型 | 说明 | 适用场景 |
|---|---|---|
| 批量发布 | 一次发送多条消息 | 生产者高吞吐 |
| 批量确认 | 一次确认多条消息 | 减少确认开销 |
| 批量消费 | 一次消费多条消息 | 消费者高吞吐 |
| 事务批量 | 批量提交事务 | 原子性要求高 |
批量处理参数
| 参数 | 默认值 | 说明 | 推荐值 |
|---|---|---|---|
| 批量大小 | - | 每批消息数 | 50-500 |
| 批量字节数 | - | 每批最大字节数 | 64KB-1MB |
| 批量超时 | - | 等待凑批超时 | 10-100ms |
| 确认批量 | - | 每次确认消息数 | 10-100 |
批量处理注意事项
- 原子性:批量消息不保证原子性
- 顺序性:批量内消息保持顺序
- 确认机制:单条失败可能影响整批
- 内存占用:批量会占用更多内存
配置示例
生产者批量配置
php
<?php
namespace App\RabbitMQ\Batch;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
class BatchPublisher
{
private AMQPChannel $channel;
private int $batchSize;
private int $batchMaxBytes;
private int $batchTimeoutMs;
private array $pendingMessages = [];
private int $pendingBytes = 0;
private ?float $batchStartTime = null;
public function __construct(
AMQPChannel $channel,
int $batchSize = 100,
int $batchMaxBytes = 65536,
int $batchTimeoutMs = 50
) {
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->batchMaxBytes = $batchMaxBytes;
$this->batchTimeoutMs = $batchTimeoutMs;
}
public function publish(
string $body,
string $exchange = '',
string $routingKey = '',
array $properties = []
): bool {
$this->startBatchIfNeeded();
$messageSize = strlen($body);
$this->pendingMessages[] = [
'body' => $body,
'exchange' => $exchange,
'routing_key' => $routingKey,
'properties' => $properties,
];
$this->pendingBytes += $messageSize;
if ($this->shouldFlush()) {
$this->flush();
return true;
}
return false;
}
public function flush(): int
{
if (empty($this->pendingMessages)) {
return 0;
}
$count = 0;
foreach ($this->pendingMessages as $msg) {
$message = new AMQPMessage($msg['body'], $msg['properties']);
$this->channel->basic_publish(
$message,
$msg['exchange'],
$msg['routing_key']
);
$count++;
}
$this->pendingMessages = [];
$this->pendingBytes = 0;
$this->batchStartTime = null;
return $count;
}
public function checkTimeout(): bool
{
if ($this->batchStartTime === null || empty($this->pendingMessages)) {
return false;
}
$elapsed = (microtime(true) - $this->batchStartTime) * 1000;
if ($elapsed >= $this->batchTimeoutMs) {
$this->flush();
return true;
}
return false;
}
public function getPendingCount(): int
{
return count($this->pendingMessages);
}
private function startBatchIfNeeded(): void
{
if ($this->batchStartTime === null) {
$this->batchStartTime = microtime(true);
}
}
private function shouldFlush(): bool
{
return count($this->pendingMessages) >= $this->batchSize ||
$this->pendingBytes >= $this->batchMaxBytes;
}
}批量确认配置
php
<?php
namespace App\RabbitMQ\Batch;
use PhpAmqpLib\Channel\AMQPChannel;
class BatchConfirmer
{
private AMQPChannel $channel;
private int $batchSize;
private int $pendingAcks = 0;
private array $unconfirmedMessages = [];
private int $nextPublishSeqNo = 1;
public function __construct(AMQPChannel $channel, int $batchSize = 100)
{
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->enableConfirmMode();
}
public function trackPublish(string $messageId = null): int
{
$seqNo = $this->nextPublishSeqNo++;
$this->unconfirmedMessages[$seqNo] = [
'message_id' => $messageId,
'timestamp' => microtime(true),
];
$this->pendingAcks++;
return $seqNo;
}
public function waitForConfirms(int $timeoutMs = 5000): array
{
$confirmed = [];
$nacked = [];
$this->channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmed) {
$this->handleAck($deliveryTag, $multiple, $confirmed);
});
$this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nacked) {
$this->handleNack($deliveryTag, $multiple, $nacked);
});
$this->channel->wait_for_pending_acks_returns($timeoutMs / 1000);
return [
'confirmed' => $confirmed,
'nacked' => $nacked,
'total' => count($this->unconfirmedMessages) + count($confirmed) + count($nacked),
];
}
public function getPendingCount(): int
{
return count($this->unconfirmedMessages);
}
private function enableConfirmMode(): void
{
$this->channel->confirm_select();
}
private function handleAck(int $deliveryTag, bool $multiple, array &$confirmed): void
{
if ($multiple) {
foreach (array_keys($this->unconfirmedMessages) as $tag) {
if ($tag <= $deliveryTag) {
$confirmed[] = $this->unconfirmedMessages[$tag];
unset($this->unconfirmedMessages[$tag]);
}
}
} else {
if (isset($this->unconfirmedMessages[$deliveryTag])) {
$confirmed[] = $this->unconfirmedMessages[$deliveryTag];
unset($this->unconfirmedMessages[$deliveryTag]);
}
}
}
private function handleNack(int $deliveryTag, bool $multiple, array &$nacked): void
{
if ($multiple) {
foreach (array_keys($this->unconfirmedMessages) as $tag) {
if ($tag <= $deliveryTag) {
$nacked[] = $this->unconfirmedMessages[$tag];
unset($this->unconfirmedMessages[$tag]);
}
}
} else {
if (isset($this->unconfirmedMessages[$deliveryTag])) {
$nacked[] = $this->unconfirmedMessages[$deliveryTag];
unset($this->unconfirmedMessages[$deliveryTag]);
}
}
}
}批量消费者配置
php
<?php
namespace App\RabbitMQ\Batch;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
class BatchConsumer
{
private AMQPChannel $channel;
private int $batchSize;
private int $batchTimeoutMs;
private array $batch = [];
private ?float $batchStartTime = null;
public function __construct(
AMQPChannel $channel,
int $batchSize = 50,
int $batchTimeoutMs = 100
) {
$this->channel = $channel;
$this->batchSize = $batchSize;
$this->batchTimeoutMs = $batchTimeoutMs;
}
public function consume(
string $queue,
callable $batchCallback,
int $prefetchCount = null
): void {
if ($prefetchCount !== null) {
$this->channel->basic_qos(null, $prefetchCount, null);
}
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($batchCallback) {
$this->addToBatch($message, $batchCallback);
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait(null, false, $this->batchTimeoutMs / 1000);
$this->checkBatchTimeout($batchCallback);
}
}
public function consumeWithTimeout(
string $queue,
callable $batchCallback,
int $totalTimeoutMs
): int {
$startTime = microtime(true);
$processed = 0;
$this->channel->basic_qos(null, $this->batchSize, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($batchCallback, &$processed) {
$this->addToBatch($message, $batchCallback);
$processed++;
}
);
while (microtime(true) - $startTime < $totalTimeoutMs / 1000) {
if ($this->channel->is_consuming()) {
$this->channel->wait(null, false, 0.1);
}
$this->checkBatchTimeout($batchCallback);
}
$this->flushBatch($batchCallback);
return $processed;
}
private function addToBatch(AMQPMessage $message, callable $batchCallback): void
{
if ($this->batchStartTime === null) {
$this->batchStartTime = microtime(true);
}
$this->batch[] = $message;
if (count($this->batch) >= $this->batchSize) {
$this->flushBatch($batchCallback);
}
}
private function checkBatchTimeout(callable $batchCallback): void
{
if ($this->batchStartTime === null || empty($this->batch)) {
return;
}
$elapsed = (microtime(true) - $this->batchStartTime) * 1000;
if ($elapsed >= $this->batchTimeoutMs) {
$this->flushBatch($batchCallback);
}
}
private function flushBatch(callable $batchCallback): void
{
if (empty($this->batch)) {
return;
}
$messages = $this->batch;
$this->batch = [];
$this->batchStartTime = null;
$results = $batchCallback($messages);
$this->handleBatchResults($messages, $results);
}
private function handleBatchResults(array $messages, array $results): void
{
foreach ($messages as $index => $message) {
$result = $results[$index] ?? 'ack';
switch ($result) {
case 'ack':
$message->ack();
break;
case 'nack':
$message->nack(true);
break;
case 'reject':
$message->reject(false);
break;
}
}
}
}PHP 代码示例
完整批量处理系统
php
<?php
namespace App\RabbitMQ\Batch;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Channel\AMQPChannel;
class BatchProcessingSystem
{
private AMQPStreamConnection $connection;
private AMQPChannel $channel;
private BatchPublisher $publisher;
private BatchConsumer $consumer;
private BatchConfirmer $confirmer;
public function __construct(
AMQPStreamConnection $connection,
int $publishBatchSize = 100,
int $consumeBatchSize = 50
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->publisher = new BatchPublisher(
$this->channel,
$publishBatchSize,
65536,
50
);
$this->consumer = new BatchConsumer(
$this->channel,
$consumeBatchSize,
100
);
$this->confirmer = new BatchConfirmer($this->channel, $publishBatchSize);
}
public function publishBatch(
array $messages,
string $exchange = '',
string $routingKey = '',
bool $waitForConfirm = true
): array {
$published = 0;
$sequenceNumbers = [];
foreach ($messages as $message) {
$body = is_array($message) ? json_encode($message) : $message;
$this->publisher->publish($body, $exchange, $routingKey, [
'delivery_mode' => 2,
]);
$seqNo = $this->confirmer->trackPublish();
$sequenceNumbers[] = $seqNo;
$published++;
}
$this->publisher->flush();
if ($waitForConfirm) {
$confirmResult = $this->confirmer->waitForConfirms();
return [
'published' => $published,
'confirmed' => count($confirmResult['confirmed']),
'nacked' => count($confirmResult['nacked']),
'sequence_numbers' => $sequenceNumbers,
];
}
return [
'published' => $published,
'sequence_numbers' => $sequenceNumbers,
];
}
public function consumeBatch(
string $queue,
callable $processor,
int $maxMessages = null
): array {
$processed = 0;
$errors = [];
$startTime = microtime(true);
$batchProcessor = function (array $messages) use ($processor, &$processed, &$errors) {
$results = [];
foreach ($messages as $index => $message) {
try {
$processor($message->body);
$results[$index] = 'ack';
$processed++;
} catch (\Exception $e) {
$results[$index] = 'nack';
$errors[] = [
'message' => $message->body,
'error' => $e->getMessage(),
];
}
}
return $results;
};
if ($maxMessages !== null) {
$this->consumer->consumeWithTimeout($queue, $batchProcessor, 30000);
} else {
$this->consumer->consume($queue, $batchProcessor);
}
return [
'processed' => $processed,
'errors' => $errors,
'duration' => round(microtime(true) - $startTime, 3),
'rate' => round($processed / (microtime(true) - $startTime), 2),
];
}
public function getStats(): array
{
return [
'publisher' => [
'pending' => $this->publisher->getPendingCount(),
],
'confirmer' => [
'pending' => $this->confirmer->getPendingCount(),
],
];
}
}批量性能测试
php
<?php
namespace App\RabbitMQ\Performance;
use App\RabbitMQ\Batch\BatchPublisher;
use App\RabbitMQ\Batch\BatchConsumer;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class BatchPerformanceTester
{
private AMQPStreamConnection $connection;
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
}
public function compareBatchSizes(
string $queueName,
int $totalMessages,
array $batchSizes
): array {
$results = [];
foreach ($batchSizes as $batchSize) {
$results[$batchSize] = $this->testBatchSize($queueName, $totalMessages, $batchSize);
}
return $results;
}
private function testBatchSize(
string $queueName,
int $totalMessages,
int $batchSize
): array {
$channel = $this->connection->channel();
$channel->queue_declare($queueName, false, false, false, true);
$message = str_repeat('x', 1024);
$publishStart = microtime(true);
$batch = [];
for ($i = 0; $i < $totalMessages; $i++) {
$batch[] = $message;
if (count($batch) >= $batchSize) {
$this->publishBatch($channel, $queueName, $batch);
$batch = [];
}
}
if (!empty($batch)) {
$this->publishBatch($channel, $queueName, $batch);
}
$publishTime = microtime(true) - $publishStart;
$consumeStart = microtime(true);
$consumed = 0;
$channel->basic_qos(null, $batchSize, null);
$channel->basic_consume($queueName, '', false, false, false, false,
function ($msg) use (&$consumed) {
$consumed++;
$msg->ack();
}
);
while ($consumed < $totalMessages) {
$channel->wait();
}
$consumeTime = microtime(true) - $consumeStart;
$channel->queue_delete($queueName);
$channel->close();
return [
'batch_size' => $batchSize,
'total_messages' => $totalMessages,
'publish_time' => round($publishTime, 3),
'consume_time' => round($consumeTime, 3),
'publish_rate' => round($totalMessages / $publishTime, 2),
'consume_rate' => round($totalMessages / $consumeTime, 2),
'network_roundtrips' => ceil($totalMessages / $batchSize),
];
}
private function publishBatch($channel, string $queue, array $messages): void
{
foreach ($messages as $msg) {
$amqpMsg = new AMQPMessage($msg);
$channel->basic_publish($amqpMsg, '', $queue);
}
}
public function testBatchConfirm(
string $queueName,
int $totalMessages,
int $batchSize
): array {
$channel = $this->connection->channel();
$channel->queue_declare($queueName, false, false, false, true);
$channel->confirm_select();
$message = str_repeat('x', 1024);
$start = microtime(true);
$published = 0;
$confirmed = 0;
while ($published < $totalMessages) {
$batchCount = min($batchSize, $totalMessages - $published);
for ($i = 0; $i < $batchCount; $i++) {
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, '', $queueName);
$published++;
}
$channel->wait_for_pending_acks();
$confirmed += $batchCount;
}
$totalTime = microtime(true) - $start;
$channel->queue_delete($queueName);
$channel->close();
return [
'batch_size' => $batchSize,
'total_messages' => $totalMessages,
'total_time' => round($totalTime, 3),
'rate' => round($totalMessages / $totalTime, 2),
'confirm_batches' => ceil($totalMessages / $batchSize),
];
}
public function testBatchTimeout(
string $queueName,
int $messageCount,
int $batchSize,
int $batchTimeoutMs
): array {
$channel = $this->connection->channel();
$channel->queue_declare($queueName, false, false, false, true);
$publisher = new BatchPublisher($channel, $batchSize, 65536, $batchTimeoutMs);
$start = microtime(true);
for ($i = 0; $i < $messageCount; $i++) {
$publisher->publish('message_' . $i, '', $queueName);
}
$publisher->flush();
$totalTime = microtime(true) - $start;
$channel->queue_delete($queueName);
$channel->close();
return [
'message_count' => $messageCount,
'batch_size' => $batchSize,
'batch_timeout_ms' => $batchTimeoutMs,
'total_time' => round($totalTime, 3),
'rate' => round($messageCount / $totalTime, 2),
];
}
}批量消息聚合器
php
<?php
namespace App\RabbitMQ\Batch;
class MessageAggregator
{
private array $aggregators = [];
private array $config;
public function __construct(array $config = [])
{
$this->config = array_merge([
'default_batch_size' => 100,
'default_timeout_ms' => 50,
'default_max_bytes' => 65536,
], $config);
}
public function add(
string $key,
string $body,
string $exchange = '',
string $routingKey = '',
array $options = []
): ?array {
if (!isset($this->aggregators[$key])) {
$this->aggregators[$key] = [
'messages' => [],
'bytes' => 0,
'start_time' => microtime(true),
'config' => array_merge($this->config, $options),
'exchange' => $exchange,
'routing_key' => $routingKey,
];
}
$aggregator = &$this->aggregators[$key];
$messageSize = strlen($body);
$aggregator['messages'][] = [
'body' => $body,
'timestamp' => microtime(true),
];
$aggregator['bytes'] += $messageSize;
if ($this->shouldFlush($aggregator)) {
return $this->flush($key);
}
return null;
}
public function flush(string $key = null): ?array
{
if ($key !== null) {
return $this->flushKey($key);
}
$results = [];
foreach (array_keys($this->aggregators) as $k) {
$result = $this->flushKey($k);
if ($result !== null) {
$results[$k] = $result;
}
}
return $results;
}
public function checkTimeouts(): array
{
$results = [];
$now = microtime(true);
foreach ($this->aggregators as $key => $aggregator) {
$elapsed = ($now - $aggregator['start_time']) * 1000;
if ($elapsed >= $aggregator['config']['default_timeout_ms'] &&
!empty($aggregator['messages'])) {
$results[$key] = $this->flush($key);
}
}
return $results;
}
public function getStats(): array
{
$stats = [];
foreach ($this->aggregators as $key => $aggregator) {
$stats[$key] = [
'pending_messages' => count($aggregator['messages']),
'pending_bytes' => $aggregator['bytes'],
'elapsed_ms' => round((microtime(true) - $aggregator['start_time']) * 1000, 2),
];
}
return $stats;
}
private function shouldFlush(array $aggregator): bool
{
$config = $aggregator['config'];
return count($aggregator['messages']) >= $config['default_batch_size'] ||
$aggregator['bytes'] >= $config['default_max_bytes'];
}
private function flushKey(string $key): ?array
{
if (!isset($this->aggregators[$key]) || empty($this->aggregators[$key]['messages'])) {
return null;
}
$aggregator = $this->aggregators[$key];
unset($this->aggregators[$key]);
return [
'key' => $key,
'messages' => $aggregator['messages'],
'count' => count($aggregator['messages']),
'total_bytes' => $aggregator['bytes'],
'exchange' => $aggregator['exchange'],
'routing_key' => $aggregator['routing_key'],
];
}
}实际应用场景
场景一:日志批量收集
php
<?php
class LogBatchCollector
{
private BatchPublisher $publisher;
public function log(string $level, string $message, array $context = []): void
{
$logEntry = json_encode([
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s'),
]);
$this->publisher->publish($logEntry, 'logs', 'app.logs');
}
public function flush(): void
{
$this->publisher->flush();
}
}场景二:订单批量处理
php
<?php
class OrderBatchProcessor
{
private BatchConsumer $consumer;
public function start(string $queue): void
{
$this->consumer->consume($queue, function (array $messages) {
$orderIds = [];
foreach ($messages as $message) {
$order = json_decode($message->body, true);
$orderIds[] = $order['id'];
}
$results = $this->processOrdersBatch($orderIds);
$acks = [];
foreach ($messages as $index => $message) {
$order = json_decode($message->body, true);
$acks[$index] = isset($results[$order['id']]) ? 'ack' : 'nack';
}
return $acks;
});
}
private function processOrdersBatch(array $orderIds): array
{
// 批量处理订单逻辑
return array_fill_keys($orderIds, true);
}
}场景三:数据同步
php
<?php
class DataSyncBatcher
{
private MessageAggregator $aggregator;
public function syncRecord(string $table, array $record): void
{
$key = "sync_{$table}";
$batch = $this->aggregator->add(
$key,
json_encode($record),
'sync',
$table
);
if ($batch !== null) {
$this->sendSyncBatch($batch);
}
}
private function sendSyncBatch(array $batch): void
{
// 发送同步批次
}
}常见问题与解决方案
问题一:批量大小选择困难
解决方案:
php
<?php
class AdaptiveBatchSizer
{
private int $currentBatchSize = 100;
private float $targetLatency = 0.1;
public function adjust(float $actualLatency): int
{
if ($actualLatency < $this->targetLatency * 0.8) {
$this->currentBatchSize = (int) ($this->currentBatchSize * 1.2);
} elseif ($actualLatency > $this->targetLatency * 1.2) {
$this->currentBatchSize = (int) ($this->currentBatchSize * 0.8);
}
return $this->currentBatchSize;
}
}问题二:批量失败处理
解决方案:
php
<?php
class BatchErrorHandler
{
public function handleBatchFailure(array $messages, \Exception $e): array
{
$results = [];
foreach ($messages as $index => $message) {
try {
$this->processSingle($message);
$results[$index] = 'ack';
} catch (\Exception $singleE) {
$results[$index] = 'nack';
}
}
return $results;
}
}最佳实践建议
批量大小选择
| 场景 | 推荐批量大小 |
|---|---|
| 低延迟要求 | 10-50 |
| 平衡场景 | 50-200 |
| 高吞吐量 | 200-1000 |
| 大消息 | 较小批量 |
超时设置
| 场景 | 推荐超时 |
|---|---|
| 实时系统 | 10-30ms |
| 一般系统 | 50-100ms |
| 批处理系统 | 100-500ms |
确认策略
| 场景 | 推荐策略 |
|---|---|
| 可靠性优先 | 每批确认 |
| 性能优先 | 多批确认 |
| 极致性能 | 异步确认 |
