Skip to content

RabbitMQ 批量处理优化

概述

批量处理是提升 RabbitMQ 吞吐量的有效手段。通过合理使用批量发布、批量确认和批量消费,可以显著减少网络往返次数和协议开销,从而大幅提升系统性能。

核心知识点

批量处理的优势

┌─────────────────────────────────────────────────────────────┐
│                   单条 vs 批量处理                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  单条发送:                                                  │
│  ┌───┐   ┌───┐   ┌───┐   ┌───┐   ┌───┐                    │
│  │ M │──▶│ M │──▶│ M │──▶│ M │──▶│ M │   5次网络往返       │
│  └───┘   └───┘   └───┘   └───┘   └───┘                    │
│                                                             │
│  批量发送:                                                  │
│  ┌───┬───┬───┬───┬───┐                                     │
│  │ M │ M │ M │ M │ M │──────────────────▶  1次网络往返      │
│  └───┴───┴───┴───┴───┘                                     │
│                                                             │
│  性能提升:                                                  │
│  - 网络往返减少 80%+                                         │
│  - 协议开销减少 80%+                                         │
│  - 吞吐量提升 2-5 倍                                         │
└─────────────────────────────────────────────────────────────┘

批量处理类型

类型说明适用场景
批量发布一次发送多条消息生产者高吞吐
批量确认一次确认多条消息减少确认开销
批量消费一次消费多条消息消费者高吞吐
事务批量批量提交事务原子性要求高

批量处理参数

参数默认值说明推荐值
批量大小-每批消息数50-500
批量字节数-每批最大字节数64KB-1MB
批量超时-等待凑批超时10-100ms
确认批量-每次确认消息数10-100

批量处理注意事项

  1. 原子性:批量消息不保证原子性
  2. 顺序性:批量内消息保持顺序
  3. 确认机制:单条失败可能影响整批
  4. 内存占用:批量会占用更多内存

配置示例

生产者批量配置

php
<?php

namespace App\RabbitMQ\Batch;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class BatchPublisher
{
    private AMQPChannel $channel;
    private int $batchSize;
    private int $batchMaxBytes;
    private int $batchTimeoutMs;
    private array $pendingMessages = [];
    private int $pendingBytes = 0;
    private ?float $batchStartTime = null;
    
    public function __construct(
        AMQPChannel $channel,
        int $batchSize = 100,
        int $batchMaxBytes = 65536,
        int $batchTimeoutMs = 50
    ) {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->batchMaxBytes = $batchMaxBytes;
        $this->batchTimeoutMs = $batchTimeoutMs;
    }
    
    public function publish(
        string $body,
        string $exchange = '',
        string $routingKey = '',
        array $properties = []
    ): bool {
        $this->startBatchIfNeeded();
        
        $messageSize = strlen($body);
        
        $this->pendingMessages[] = [
            'body' => $body,
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'properties' => $properties,
        ];
        
        $this->pendingBytes += $messageSize;
        
        if ($this->shouldFlush()) {
            $this->flush();
            return true;
        }
        
        return false;
    }

    public function flush(): int
    {
        if (empty($this->pendingMessages)) {
            return 0;
        }
        
        $count = 0;
        
        foreach ($this->pendingMessages as $msg) {
            $message = new AMQPMessage($msg['body'], $msg['properties']);
            $this->channel->basic_publish(
                $message,
                $msg['exchange'],
                $msg['routing_key']
            );
            $count++;
        }
        
        $this->pendingMessages = [];
        $this->pendingBytes = 0;
        $this->batchStartTime = null;
        
        return $count;
    }

    public function checkTimeout(): bool
    {
        if ($this->batchStartTime === null || empty($this->pendingMessages)) {
            return false;
        }
        
        $elapsed = (microtime(true) - $this->batchStartTime) * 1000;
        
        if ($elapsed >= $this->batchTimeoutMs) {
            $this->flush();
            return true;
        }
        
        return false;
    }

    public function getPendingCount(): int
    {
        return count($this->pendingMessages);
    }

    private function startBatchIfNeeded(): void
    {
        if ($this->batchStartTime === null) {
            $this->batchStartTime = microtime(true);
        }
    }

    private function shouldFlush(): bool
    {
        return count($this->pendingMessages) >= $this->batchSize ||
               $this->pendingBytes >= $this->batchMaxBytes;
    }
}

批量确认配置

php
<?php

namespace App\RabbitMQ\Batch;

use PhpAmqpLib\Channel\AMQPChannel;

class BatchConfirmer
{
    private AMQPChannel $channel;
    private int $batchSize;
    private int $pendingAcks = 0;
    private array $unconfirmedMessages = [];
    private int $nextPublishSeqNo = 1;
    
    public function __construct(AMQPChannel $channel, int $batchSize = 100)
    {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->enableConfirmMode();
    }
    
    public function trackPublish(string $messageId = null): int
    {
        $seqNo = $this->nextPublishSeqNo++;
        
        $this->unconfirmedMessages[$seqNo] = [
            'message_id' => $messageId,
            'timestamp' => microtime(true),
        ];
        
        $this->pendingAcks++;
        
        return $seqNo;
    }

    public function waitForConfirms(int $timeoutMs = 5000): array
    {
        $confirmed = [];
        $nacked = [];
        
        $this->channel->set_ack_handler(function ($deliveryTag, $multiple) use (&$confirmed) {
            $this->handleAck($deliveryTag, $multiple, $confirmed);
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple, $requeue) use (&$nacked) {
            $this->handleNack($deliveryTag, $multiple, $nacked);
        });
        
        $this->channel->wait_for_pending_acks_returns($timeoutMs / 1000);
        
        return [
            'confirmed' => $confirmed,
            'nacked' => $nacked,
            'total' => count($this->unconfirmedMessages) + count($confirmed) + count($nacked),
        ];
    }

    public function getPendingCount(): int
    {
        return count($this->unconfirmedMessages);
    }

    private function enableConfirmMode(): void
    {
        $this->channel->confirm_select();
    }

    private function handleAck(int $deliveryTag, bool $multiple, array &$confirmed): void
    {
        if ($multiple) {
            foreach (array_keys($this->unconfirmedMessages) as $tag) {
                if ($tag <= $deliveryTag) {
                    $confirmed[] = $this->unconfirmedMessages[$tag];
                    unset($this->unconfirmedMessages[$tag]);
                }
            }
        } else {
            if (isset($this->unconfirmedMessages[$deliveryTag])) {
                $confirmed[] = $this->unconfirmedMessages[$deliveryTag];
                unset($this->unconfirmedMessages[$deliveryTag]);
            }
        }
    }

    private function handleNack(int $deliveryTag, bool $multiple, array &$nacked): void
    {
        if ($multiple) {
            foreach (array_keys($this->unconfirmedMessages) as $tag) {
                if ($tag <= $deliveryTag) {
                    $nacked[] = $this->unconfirmedMessages[$tag];
                    unset($this->unconfirmedMessages[$tag]);
                }
            }
        } else {
            if (isset($this->unconfirmedMessages[$deliveryTag])) {
                $nacked[] = $this->unconfirmedMessages[$deliveryTag];
                unset($this->unconfirmedMessages[$deliveryTag]);
            }
        }
    }
}

批量消费者配置

php
<?php

namespace App\RabbitMQ\Batch;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class BatchConsumer
{
    private AMQPChannel $channel;
    private int $batchSize;
    private int $batchTimeoutMs;
    private array $batch = [];
    private ?float $batchStartTime = null;
    
    public function __construct(
        AMQPChannel $channel,
        int $batchSize = 50,
        int $batchTimeoutMs = 100
    ) {
        $this->channel = $channel;
        $this->batchSize = $batchSize;
        $this->batchTimeoutMs = $batchTimeoutMs;
    }
    
    public function consume(
        string $queue,
        callable $batchCallback,
        int $prefetchCount = null
    ): void {
        if ($prefetchCount !== null) {
            $this->channel->basic_qos(null, $prefetchCount, null);
        }
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($batchCallback) {
                $this->addToBatch($message, $batchCallback);
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait(null, false, $this->batchTimeoutMs / 1000);
            $this->checkBatchTimeout($batchCallback);
        }
    }

    public function consumeWithTimeout(
        string $queue,
        callable $batchCallback,
        int $totalTimeoutMs
    ): int {
        $startTime = microtime(true);
        $processed = 0;
        
        $this->channel->basic_qos(null, $this->batchSize, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($batchCallback, &$processed) {
                $this->addToBatch($message, $batchCallback);
                $processed++;
            }
        );
        
        while (microtime(true) - $startTime < $totalTimeoutMs / 1000) {
            if ($this->channel->is_consuming()) {
                $this->channel->wait(null, false, 0.1);
            }
            $this->checkBatchTimeout($batchCallback);
        }
        
        $this->flushBatch($batchCallback);
        
        return $processed;
    }

    private function addToBatch(AMQPMessage $message, callable $batchCallback): void
    {
        if ($this->batchStartTime === null) {
            $this->batchStartTime = microtime(true);
        }
        
        $this->batch[] = $message;
        
        if (count($this->batch) >= $this->batchSize) {
            $this->flushBatch($batchCallback);
        }
    }

    private function checkBatchTimeout(callable $batchCallback): void
    {
        if ($this->batchStartTime === null || empty($this->batch)) {
            return;
        }
        
        $elapsed = (microtime(true) - $this->batchStartTime) * 1000;
        
        if ($elapsed >= $this->batchTimeoutMs) {
            $this->flushBatch($batchCallback);
        }
    }

    private function flushBatch(callable $batchCallback): void
    {
        if (empty($this->batch)) {
            return;
        }
        
        $messages = $this->batch;
        $this->batch = [];
        $this->batchStartTime = null;
        
        $results = $batchCallback($messages);
        
        $this->handleBatchResults($messages, $results);
    }

    private function handleBatchResults(array $messages, array $results): void
    {
        foreach ($messages as $index => $message) {
            $result = $results[$index] ?? 'ack';
            
            switch ($result) {
                case 'ack':
                    $message->ack();
                    break;
                case 'nack':
                    $message->nack(true);
                    break;
                case 'reject':
                    $message->reject(false);
                    break;
            }
        }
    }
}

PHP 代码示例

完整批量处理系统

php
<?php

namespace App\RabbitMQ\Batch;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Channel\AMQPChannel;

class BatchProcessingSystem
{
    private AMQPStreamConnection $connection;
    private AMQPChannel $channel;
    private BatchPublisher $publisher;
    private BatchConsumer $consumer;
    private BatchConfirmer $confirmer;
    
    public function __construct(
        AMQPStreamConnection $connection,
        int $publishBatchSize = 100,
        int $consumeBatchSize = 50
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        
        $this->publisher = new BatchPublisher(
            $this->channel,
            $publishBatchSize,
            65536,
            50
        );
        
        $this->consumer = new BatchConsumer(
            $this->channel,
            $consumeBatchSize,
            100
        );
        
        $this->confirmer = new BatchConfirmer($this->channel, $publishBatchSize);
    }
    
    public function publishBatch(
        array $messages,
        string $exchange = '',
        string $routingKey = '',
        bool $waitForConfirm = true
    ): array {
        $published = 0;
        $sequenceNumbers = [];
        
        foreach ($messages as $message) {
            $body = is_array($message) ? json_encode($message) : $message;
            
            $this->publisher->publish($body, $exchange, $routingKey, [
                'delivery_mode' => 2,
            ]);
            
            $seqNo = $this->confirmer->trackPublish();
            $sequenceNumbers[] = $seqNo;
            $published++;
        }
        
        $this->publisher->flush();
        
        if ($waitForConfirm) {
            $confirmResult = $this->confirmer->waitForConfirms();
            
            return [
                'published' => $published,
                'confirmed' => count($confirmResult['confirmed']),
                'nacked' => count($confirmResult['nacked']),
                'sequence_numbers' => $sequenceNumbers,
            ];
        }
        
        return [
            'published' => $published,
            'sequence_numbers' => $sequenceNumbers,
        ];
    }

    public function consumeBatch(
        string $queue,
        callable $processor,
        int $maxMessages = null
    ): array {
        $processed = 0;
        $errors = [];
        $startTime = microtime(true);
        
        $batchProcessor = function (array $messages) use ($processor, &$processed, &$errors) {
            $results = [];
            
            foreach ($messages as $index => $message) {
                try {
                    $processor($message->body);
                    $results[$index] = 'ack';
                    $processed++;
                } catch (\Exception $e) {
                    $results[$index] = 'nack';
                    $errors[] = [
                        'message' => $message->body,
                        'error' => $e->getMessage(),
                    ];
                }
            }
            
            return $results;
        };
        
        if ($maxMessages !== null) {
            $this->consumer->consumeWithTimeout($queue, $batchProcessor, 30000);
        } else {
            $this->consumer->consume($queue, $batchProcessor);
        }
        
        return [
            'processed' => $processed,
            'errors' => $errors,
            'duration' => round(microtime(true) - $startTime, 3),
            'rate' => round($processed / (microtime(true) - $startTime), 2),
        ];
    }

    public function getStats(): array
    {
        return [
            'publisher' => [
                'pending' => $this->publisher->getPendingCount(),
            ],
            'confirmer' => [
                'pending' => $this->confirmer->getPendingCount(),
            ],
        ];
    }
}

批量性能测试

php
<?php

namespace App\RabbitMQ\Performance;

use App\RabbitMQ\Batch\BatchPublisher;
use App\RabbitMQ\Batch\BatchConsumer;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class BatchPerformanceTester
{
    private AMQPStreamConnection $connection;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
    }
    
    public function compareBatchSizes(
        string $queueName,
        int $totalMessages,
        array $batchSizes
    ): array {
        $results = [];
        
        foreach ($batchSizes as $batchSize) {
            $results[$batchSize] = $this->testBatchSize($queueName, $totalMessages, $batchSize);
        }
        
        return $results;
    }

    private function testBatchSize(
        string $queueName,
        int $totalMessages,
        int $batchSize
    ): array {
        $channel = $this->connection->channel();
        $channel->queue_declare($queueName, false, false, false, true);
        
        $message = str_repeat('x', 1024);
        
        $publishStart = microtime(true);
        
        $batch = [];
        for ($i = 0; $i < $totalMessages; $i++) {
            $batch[] = $message;
            
            if (count($batch) >= $batchSize) {
                $this->publishBatch($channel, $queueName, $batch);
                $batch = [];
            }
        }
        
        if (!empty($batch)) {
            $this->publishBatch($channel, $queueName, $batch);
        }
        
        $publishTime = microtime(true) - $publishStart;
        
        $consumeStart = microtime(true);
        $consumed = 0;
        
        $channel->basic_qos(null, $batchSize, null);
        $channel->basic_consume($queueName, '', false, false, false, false,
            function ($msg) use (&$consumed) {
                $consumed++;
                $msg->ack();
            }
        );
        
        while ($consumed < $totalMessages) {
            $channel->wait();
        }
        
        $consumeTime = microtime(true) - $consumeStart;
        
        $channel->queue_delete($queueName);
        $channel->close();
        
        return [
            'batch_size' => $batchSize,
            'total_messages' => $totalMessages,
            'publish_time' => round($publishTime, 3),
            'consume_time' => round($consumeTime, 3),
            'publish_rate' => round($totalMessages / $publishTime, 2),
            'consume_rate' => round($totalMessages / $consumeTime, 2),
            'network_roundtrips' => ceil($totalMessages / $batchSize),
        ];
    }

    private function publishBatch($channel, string $queue, array $messages): void
    {
        foreach ($messages as $msg) {
            $amqpMsg = new AMQPMessage($msg);
            $channel->basic_publish($amqpMsg, '', $queue);
        }
    }

    public function testBatchConfirm(
        string $queueName,
        int $totalMessages,
        int $batchSize
    ): array {
        $channel = $this->connection->channel();
        $channel->queue_declare($queueName, false, false, false, true);
        $channel->confirm_select();
        
        $message = str_repeat('x', 1024);
        
        $start = microtime(true);
        $published = 0;
        $confirmed = 0;
        
        while ($published < $totalMessages) {
            $batchCount = min($batchSize, $totalMessages - $published);
            
            for ($i = 0; $i < $batchCount; $i++) {
                $msg = new AMQPMessage($message);
                $channel->basic_publish($msg, '', $queueName);
                $published++;
            }
            
            $channel->wait_for_pending_acks();
            $confirmed += $batchCount;
        }
        
        $totalTime = microtime(true) - $start;
        
        $channel->queue_delete($queueName);
        $channel->close();
        
        return [
            'batch_size' => $batchSize,
            'total_messages' => $totalMessages,
            'total_time' => round($totalTime, 3),
            'rate' => round($totalMessages / $totalTime, 2),
            'confirm_batches' => ceil($totalMessages / $batchSize),
        ];
    }

    public function testBatchTimeout(
        string $queueName,
        int $messageCount,
        int $batchSize,
        int $batchTimeoutMs
    ): array {
        $channel = $this->connection->channel();
        $channel->queue_declare($queueName, false, false, false, true);
        
        $publisher = new BatchPublisher($channel, $batchSize, 65536, $batchTimeoutMs);
        
        $start = microtime(true);
        
        for ($i = 0; $i < $messageCount; $i++) {
            $publisher->publish('message_' . $i, '', $queueName);
        }
        
        $publisher->flush();
        
        $totalTime = microtime(true) - $start;
        
        $channel->queue_delete($queueName);
        $channel->close();
        
        return [
            'message_count' => $messageCount,
            'batch_size' => $batchSize,
            'batch_timeout_ms' => $batchTimeoutMs,
            'total_time' => round($totalTime, 3),
            'rate' => round($messageCount / $totalTime, 2),
        ];
    }
}

批量消息聚合器

php
<?php

namespace App\RabbitMQ\Batch;

class MessageAggregator
{
    private array $aggregators = [];
    private array $config;
    
    public function __construct(array $config = [])
    {
        $this->config = array_merge([
            'default_batch_size' => 100,
            'default_timeout_ms' => 50,
            'default_max_bytes' => 65536,
        ], $config);
    }
    
    public function add(
        string $key,
        string $body,
        string $exchange = '',
        string $routingKey = '',
        array $options = []
    ): ?array {
        if (!isset($this->aggregators[$key])) {
            $this->aggregators[$key] = [
                'messages' => [],
                'bytes' => 0,
                'start_time' => microtime(true),
                'config' => array_merge($this->config, $options),
                'exchange' => $exchange,
                'routing_key' => $routingKey,
            ];
        }
        
        $aggregator = &$this->aggregators[$key];
        $messageSize = strlen($body);
        
        $aggregator['messages'][] = [
            'body' => $body,
            'timestamp' => microtime(true),
        ];
        $aggregator['bytes'] += $messageSize;
        
        if ($this->shouldFlush($aggregator)) {
            return $this->flush($key);
        }
        
        return null;
    }

    public function flush(string $key = null): ?array
    {
        if ($key !== null) {
            return $this->flushKey($key);
        }
        
        $results = [];
        foreach (array_keys($this->aggregators) as $k) {
            $result = $this->flushKey($k);
            if ($result !== null) {
                $results[$k] = $result;
            }
        }
        return $results;
    }

    public function checkTimeouts(): array
    {
        $results = [];
        $now = microtime(true);
        
        foreach ($this->aggregators as $key => $aggregator) {
            $elapsed = ($now - $aggregator['start_time']) * 1000;
            
            if ($elapsed >= $aggregator['config']['default_timeout_ms'] && 
                !empty($aggregator['messages'])) {
                $results[$key] = $this->flush($key);
            }
        }
        
        return $results;
    }

    public function getStats(): array
    {
        $stats = [];
        
        foreach ($this->aggregators as $key => $aggregator) {
            $stats[$key] = [
                'pending_messages' => count($aggregator['messages']),
                'pending_bytes' => $aggregator['bytes'],
                'elapsed_ms' => round((microtime(true) - $aggregator['start_time']) * 1000, 2),
            ];
        }
        
        return $stats;
    }

    private function shouldFlush(array $aggregator): bool
    {
        $config = $aggregator['config'];
        
        return count($aggregator['messages']) >= $config['default_batch_size'] ||
               $aggregator['bytes'] >= $config['default_max_bytes'];
    }

    private function flushKey(string $key): ?array
    {
        if (!isset($this->aggregators[$key]) || empty($this->aggregators[$key]['messages'])) {
            return null;
        }
        
        $aggregator = $this->aggregators[$key];
        
        unset($this->aggregators[$key]);
        
        return [
            'key' => $key,
            'messages' => $aggregator['messages'],
            'count' => count($aggregator['messages']),
            'total_bytes' => $aggregator['bytes'],
            'exchange' => $aggregator['exchange'],
            'routing_key' => $aggregator['routing_key'],
        ];
    }
}

实际应用场景

场景一:日志批量收集

php
<?php

class LogBatchCollector
{
    private BatchPublisher $publisher;
    
    public function log(string $level, string $message, array $context = []): void
    {
        $logEntry = json_encode([
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => date('Y-m-d H:i:s'),
        ]);
        
        $this->publisher->publish($logEntry, 'logs', 'app.logs');
    }

    public function flush(): void
    {
        $this->publisher->flush();
    }
}

场景二:订单批量处理

php
<?php

class OrderBatchProcessor
{
    private BatchConsumer $consumer;
    
    public function start(string $queue): void
    {
        $this->consumer->consume($queue, function (array $messages) {
            $orderIds = [];
            
            foreach ($messages as $message) {
                $order = json_decode($message->body, true);
                $orderIds[] = $order['id'];
            }
            
            $results = $this->processOrdersBatch($orderIds);
            
            $acks = [];
            foreach ($messages as $index => $message) {
                $order = json_decode($message->body, true);
                $acks[$index] = isset($results[$order['id']]) ? 'ack' : 'nack';
            }
            
            return $acks;
        });
    }

    private function processOrdersBatch(array $orderIds): array
    {
        // 批量处理订单逻辑
        return array_fill_keys($orderIds, true);
    }
}

场景三:数据同步

php
<?php

class DataSyncBatcher
{
    private MessageAggregator $aggregator;
    
    public function syncRecord(string $table, array $record): void
    {
        $key = "sync_{$table}";
        
        $batch = $this->aggregator->add(
            $key,
            json_encode($record),
            'sync',
            $table
        );
        
        if ($batch !== null) {
            $this->sendSyncBatch($batch);
        }
    }

    private function sendSyncBatch(array $batch): void
    {
        // 发送同步批次
    }
}

常见问题与解决方案

问题一:批量大小选择困难

解决方案

php
<?php

class AdaptiveBatchSizer
{
    private int $currentBatchSize = 100;
    private float $targetLatency = 0.1;
    
    public function adjust(float $actualLatency): int
    {
        if ($actualLatency < $this->targetLatency * 0.8) {
            $this->currentBatchSize = (int) ($this->currentBatchSize * 1.2);
        } elseif ($actualLatency > $this->targetLatency * 1.2) {
            $this->currentBatchSize = (int) ($this->currentBatchSize * 0.8);
        }
        
        return $this->currentBatchSize;
    }
}

问题二:批量失败处理

解决方案

php
<?php

class BatchErrorHandler
{
    public function handleBatchFailure(array $messages, \Exception $e): array
    {
        $results = [];
        
        foreach ($messages as $index => $message) {
            try {
                $this->processSingle($message);
                $results[$index] = 'ack';
            } catch (\Exception $singleE) {
                $results[$index] = 'nack';
            }
        }
        
        return $results;
    }
}

最佳实践建议

批量大小选择

场景推荐批量大小
低延迟要求10-50
平衡场景50-200
高吞吐量200-1000
大消息较小批量

超时设置

场景推荐超时
实时系统10-30ms
一般系统50-100ms
批处理系统100-500ms

确认策略

场景推荐策略
可靠性优先每批确认
性能优先多批确认
极致性能异步确认

相关链接