Appearance
RabbitMQ 基准测试
概述
基准测试是评估 RabbitMQ 性能的重要手段,通过科学的测试方法可以准确了解系统的吞吐量、延迟和资源消耗,为性能优化提供数据支撑。
核心知识点
基准测试指标
| 指标 | 说明 | 单位 |
|---|---|---|
| 吞吐量 | 单位时间内处理的消息数量 | msg/s |
| 延迟 | 消息从发送到被消费的时间 | ms |
| P99延迟 | 99%消息的最大延迟 | ms |
| 资源消耗 | CPU、内存、网络、磁盘使用率 | % |
测试工具选择
1. RabbitMQ PerfTest
官方性能测试工具,功能强大:
bash
# 安装 PerfTest
wget https://github.com/rabbitmq/rabbitmq-perf-test/releases/download/v2.19.0/rabbitmq-perf-test-linux-bin-2.19.0.tar.gz
tar -xzf rabbitmq-perf-test-linux-bin-2.19.0.tar.gz
# 基础测试
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://guest:guest@localhost:5672 \
-t 4 -s 1024 -x 2 -y 4 -u test_queue2. 参数说明
bash
# PerfTest 常用参数
-h, --uri # RabbitMQ 连接URI
-t, --threads # 工作线程数
-s, --size # 消息大小(字节)
-x, --producers # 生产者数量
-y, --consumers # 消费者数量
-u, --queue # 队列名称
-r, --rate # 发布速率限制
-a, --autoack # 自动确认
-p, --predeclared # 使用已存在的队列测试场景设计
场景一:吞吐量测试
bash
# 高吞吐量测试
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://guest:guest@localhost:5672 \
-t 8 -s 1024 -x 4 -y 8 \
-u throughput_test \
-p \
--producer-scheduler-threads 4 \
--consumers-thread-pools 4场景二:延迟测试
bash
# 低延迟测试
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://guest:guest@localhost:5672 \
-t 2 -s 256 -x 1 -y 1 \
-u latency_test \
--autoack false \
--multi-ack-every 1场景三:持久化测试
bash
# 持久化消息测试
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://guest:guest@localhost:5672 \
-t 4 -s 1024 -x 2 -y 4 \
-u persistent_test \
-f persistent \
--queue-args x-ha-policy=all配置示例
测试环境配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 基础配置
listeners.tcp.default = 5672
management.tcp.port = 15672
# 性能相关配置
channel_max = 2048
connection_max = 65535
# 内存配置
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
# 磁盘配置
disk_free_limit.relative = 2.0
# 队列配置
queue_master_locator = min-masters测试脚本配置
bash
#!/bin/bash
# benchmark.sh
RABBITMQ_HOST="localhost"
RABBITMQ_PORT="5672"
RABBITMQ_USER="guest"
RABBITMQ_PASS="guest"
run_throughput_test() {
echo "=== 吞吐量测试 ==="
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
-t 8 -s 1024 -x 4 -y 8 \
-u throughput_queue \
-p \
--time 60 \
--output-file throughput_results.json
}
run_latency_test() {
echo "=== 延迟测试 ==="
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
-t 2 -s 256 -x 1 -y 1 \
-u latency_queue \
--autoack false \
--time 60 \
--output-file latency_results.json
}
run_persistence_test() {
echo "=== 持久化测试 ==="
./bin/runjava com.rabbitmq.perf.PerfTest \
-h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
-t 4 -s 1024 -x 2 -y 4 \
-u persistent_queue \
-f persistent \
--time 60 \
--output-file persistence_results.json
}
case "$1" in
throughput) run_throughput_test ;;
latency) run_latency_test ;;
persistence) run_persistence_test ;;
all)
run_throughput_test
run_latency_test
run_persistence_test
;;
*) echo "Usage: $0 {throughput|latency|persistence|all}" ;;
esacPHP 代码示例
性能测试类
php
<?php
namespace App\RabbitMQ\Benchmark;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AMQPChannel;
class RabbitMQBenchmark
{
private AMQPStreamConnection $connection;
private AMQPChannel $channel;
private array $results = [];
public function __construct(string $host, int $port, string $user, string $pass)
{
$this->connection = new AMQPStreamConnection(
$host,
$port,
$user,
$pass,
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
3.0,
null,
true,
60
);
$this->channel = $this->connection->channel();
}
public function runThroughputTest(
string $queueName,
int $messageCount,
int $messageSize,
int $producers = 1,
int $consumers = 1
): array {
$this->declareQueue($queueName);
$message = str_repeat('x', $messageSize);
$startTime = microtime(true);
$producerResults = $this->runProducers(
$queueName,
$message,
$messageCount,
$producers
);
$consumerResults = $this->runConsumers(
$queueName,
$messageCount,
$consumers
);
$endTime = microtime(true);
$totalTime = $endTime - $startTime;
return [
'total_messages' => $messageCount,
'total_time' => round($totalTime, 3),
'throughput' => round($messageCount / $totalTime, 2),
'message_size' => $messageSize,
'producers' => $producers,
'consumers' => $consumers,
'producer_stats' => $producerResults,
'consumer_stats' => $consumerResults,
];
}
public function runLatencyTest(
string $queueName,
int $messageCount,
int $messageSize
): array {
$this->declareQueue($queueName);
$latencies = [];
$message = str_repeat('x', $messageSize);
for ($i = 0; $i < $messageCount; $i++) {
$sendTime = microtime(true);
$msg = new AMQPMessage(
json_encode([
'payload' => $message,
'send_time' => $sendTime,
]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]
);
$this->channel->basic_publish($msg, '', $queueName);
$this->channel->basic_consume($queueName, '', false, true, false, false,
function ($msg) use (&$latencies) {
$data = json_decode($msg->body, true);
$latency = (microtime(true) - $data['send_time']) * 1000;
$latencies[] = $latency;
}
);
$this->channel->wait();
}
sort($latencies);
return [
'message_count' => $messageCount,
'avg_latency' => round(array_sum($latencies) / count($latencies), 2),
'min_latency' => round(min($latencies), 2),
'max_latency' => round(max($latencies), 2),
'p50_latency' => round($this->percentile($latencies, 50), 2),
'p95_latency' => round($this->percentile($latencies, 95), 2),
'p99_latency' => round($this->percentile($latencies, 99), 2),
];
}
public function runPersistenceTest(
string $queueName,
int $messageCount,
int $messageSize
): array {
$this->declareQueue($queueName, true);
$message = str_repeat('x', $messageSize);
$startTime = microtime(true);
for ($i = 0; $i < $messageCount; $i++) {
$msg = new AMQPMessage(
$message,
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($msg, '', $queueName);
}
$publishTime = microtime(true) - $startTime;
$consumeStart = microtime(true);
$consumed = 0;
$this->channel->basic_consume($queueName, '', false, false, false, false,
function ($msg) use (&$consumed) {
$consumed++;
$msg->ack();
}
);
while ($consumed < $messageCount) {
$this->channel->wait();
}
$consumeTime = microtime(true) - $consumeStart;
$totalTime = microtime(true) - $startTime;
return [
'message_count' => $messageCount,
'message_size' => $messageSize,
'publish_time' => round($publishTime, 3),
'consume_time' => round($consumeTime, 3),
'total_time' => round($totalTime, 3),
'publish_rate' => round($messageCount / $publishTime, 2),
'consume_rate' => round($messageCount / $consumeTime, 2),
];
}
private function declareQueue(string $queueName, bool $durable = false): void
{
$this->channel->queue_declare(
$queueName,
false,
$durable,
false,
false
);
}
private function runProducers(
string $queueName,
string $message,
int $count,
int $producers
): array {
$messagesPerProducer = (int) ($count / $producers);
$results = [];
for ($p = 0; $p < $producers; $p++) {
$startTime = microtime(true);
for ($i = 0; $i < $messagesPerProducer; $i++) {
$msg = new AMQPMessage($message);
$this->channel->basic_publish($msg, '', $queueName);
}
$results[] = [
'producer_id' => $p,
'messages' => $messagesPerProducer,
'time' => round(microtime(true) - $startTime, 3),
];
}
return $results;
}
private function runConsumers(
string $queueName,
int $expectedCount,
int $consumers
): array {
$results = [];
$totalConsumed = 0;
$consumerCounts = array_fill(0, $consumers, 0);
$callback = function ($msg, $consumerId) use (&$consumerCounts, &$totalConsumed) {
$consumerCounts[$consumerId]++;
$totalConsumed++;
$msg->ack();
};
$startTimes = [];
for ($c = 0; $c < $consumers; $c++) {
$startTimes[$c] = microtime(true);
$consumerTag = $this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
function ($msg) use ($callback, $c) {
$callback($msg, $c);
}
);
}
while ($totalConsumed < $expectedCount) {
$this->channel->wait();
}
for ($c = 0; $c < $consumers; $c++) {
$results[] = [
'consumer_id' => $c,
'messages' => $consumerCounts[$c],
'time' => round(microtime(true) - $startTimes[$c], 3),
];
}
return $results;
}
private function percentile(array $data, float $percentile): float
{
$index = (count($data) - 1) * ($percentile / 100);
$lower = floor($index);
$upper = ceil($index);
if ($lower === $upper) {
return $data[$index];
}
return $data[$lower] + ($data[$upper] - $data[$lower]) * ($index - $lower);
}
public function getSystemMetrics(): array
{
return [
'memory_usage' => memory_get_usage(true),
'memory_peak' => memory_get_peak_usage(true),
'connection_stats' => $this->connection->getServerProperties(),
];
}
public function __destruct()
{
if ($this->channel->is_open()) {
$this->channel->close();
}
if ($this->connection->isConnected()) {
$this->connection->close();
}
}
}测试执行脚本
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use App\RabbitMQ\Benchmark\RabbitMQBenchmark;
$benchmark = new RabbitMQBenchmark(
'localhost',
5672,
'guest',
'guest'
);
echo "=== RabbitMQ 性能基准测试 ===\n\n";
echo "1. 吞吐量测试\n";
$throughputResults = $benchmark->runThroughputTest(
'benchmark_throughput',
10000,
1024,
2,
4
);
print_r($throughputResults);
echo "\n2. 延迟测试\n";
$latencyResults = $benchmark->runLatencyTest(
'benchmark_latency',
1000,
256
);
print_r($latencyResults);
echo "\n3. 持久化测试\n";
$persistenceResults = $benchmark->runPersistenceTest(
'benchmark_persistent',
5000,
1024
);
print_r($persistenceResults);
echo "\n4. 系统指标\n";
$systemMetrics = $benchmark->getSystemMetrics();
print_r($systemMetrics);结果分析类
php
<?php
namespace App\RabbitMQ\Benchmark;
class BenchmarkAnalyzer
{
public function analyzeResults(array $results): array
{
$analysis = [];
foreach ($results as $testName => $data) {
$analysis[$testName] = [
'score' => $this->calculateScore($data),
'bottleneck' => $this->identifyBottleneck($data),
'recommendations' => $this->getRecommendations($data),
];
}
return $analysis;
}
private function calculateScore(array $data): string
{
if (isset($data['throughput'])) {
if ($data['throughput'] > 50000) return '优秀';
if ($data['throughput'] > 20000) return '良好';
if ($data['throughput'] > 10000) return '一般';
return '需优化';
}
if (isset($data['avg_latency'])) {
if ($data['avg_latency'] < 1) return '优秀';
if ($data['avg_latency'] < 5) return '良好';
if ($data['avg_latency'] < 20) return '一般';
return '需优化';
}
return '未知';
}
private function identifyBottleneck(array $data): string
{
if (isset($data['p99_latency']) && $data['p99_latency'] > $data['avg_latency'] * 3) {
return '延迟波动大,可能存在GC或网络问题';
}
if (isset($data['producer_stats']) && isset($data['consumer_stats'])) {
$producerRate = $this->calculateAverageRate($data['producer_stats']);
$consumerRate = $this->calculateAverageRate($data['consumer_stats']);
if ($producerRate > $consumerRate * 1.5) {
return '消费者处理能力不足';
}
if ($consumerRate > $producerRate * 1.5) {
return '生产者发送能力不足';
}
}
return '未发现明显瓶颈';
}
private function getRecommendations(array $data): array
{
$recommendations = [];
if (isset($data['throughput']) && $data['throughput'] < 20000) {
$recommendations[] = '增加消费者数量';
$recommendations[] = '使用批量确认';
$recommendations[] = '调整预取计数';
}
if (isset($data['p99_latency']) && $data['p99_latency'] > 10) {
$recommendations[] = '减少消息大小';
$recommendations[] = '使用非持久化消息(如允许)';
$recommendations[] = '优化网络配置';
}
return $recommendations;
}
private function calculateAverageRate(array $stats): float
{
$totalRate = 0;
foreach ($stats as $stat) {
$totalRate += $stat['messages'] / $stat['time'];
}
return $totalRate / count($stats);
}
}实际应用场景
场景一:新系统上线前压测
php
<?php
class PreLaunchBenchmark
{
private RabbitMQBenchmark $benchmark;
public function runFullBenchmark(): array
{
$results = [];
$results['light_load'] = $this->benchmark->runThroughputTest(
'pre_launch_light',
1000,
512,
1,
1
);
$results['normal_load'] = $this->benchmark->runThroughputTest(
'pre_launch_normal',
10000,
1024,
2,
4
);
$results['peak_load'] = $this->benchmark->runThroughputTest(
'pre_launch_peak',
50000,
1024,
4,
8
);
return $this->generateReport($results);
}
private function generateReport(array $results): array
{
return [
'timestamp' => date('Y-m-d H:i:s'),
'results' => $results,
'summary' => [
'max_throughput' => max(array_column($results, 'throughput')),
'min_latency' => min(array_column($results, 'avg_latency') ?: [0]),
],
'passed' => $this->evaluateResults($results),
];
}
private function evaluateResults(array $results): bool
{
$peakThroughput = $results['peak_load']['throughput'] ?? 0;
return $peakThroughput >= 20000;
}
}场景二:性能对比测试
php
<?php
class ComparisonBenchmark
{
public function compareConfigurations(array $configs): array
{
$results = [];
foreach ($configs as $name => $config) {
$benchmark = new RabbitMQBenchmark(
$config['host'],
$config['port'],
$config['user'],
$config['pass']
);
$results[$name] = $benchmark->runThroughputTest(
"compare_{$name}",
10000,
1024,
2,
4
);
}
return $this->compareResults($results);
}
private function compareResults(array $results): array
{
$comparison = [];
$baseline = array_key_first($results);
foreach ($results as $name => $data) {
$comparison[$name] = [
'throughput' => $data['throughput'],
'relative_to_baseline' => $name === $baseline
? '100%'
: round(($data['throughput'] / $results[$baseline]['throughput']) * 100, 1) . '%',
];
}
return $comparison;
}
}常见问题与解决方案
问题一:测试结果不稳定
原因分析:
- 系统资源竞争
- 网络波动
- GC 停顿
解决方案:
php
<?php
class StableBenchmark
{
public function runWithWarmup(
string $queueName,
int $warmupMessages,
int $testMessages
): array {
$benchmark = new RabbitMQBenchmark('localhost', 5672, 'guest', 'guest');
echo "预热阶段...\n";
$benchmark->runThroughputTest($queueName, $warmupMessages, 1024, 1, 1);
echo "正式测试...\n";
$results = [];
for ($i = 0; $i < 3; $i++) {
$results[] = $benchmark->runThroughputTest(
$queueName . "_{$i}",
$testMessages,
1024,
2,
4
);
}
return $this->calculateStableResult($results);
}
private function calculateStableResult(array $results): array
{
$throughputs = array_column($results, 'throughput');
sort($throughputs);
$trimmed = array_slice($throughputs, 1, -1);
return [
'avg_throughput' => array_sum($trimmed) / count($trimmed),
'min_throughput' => min($trimmed),
'max_throughput' => max($trimmed),
'variance' => $this->calculateVariance($trimmed),
];
}
private function calculateVariance(array $data): float
{
$mean = array_sum($data) / count($data);
$variance = 0;
foreach ($data as $value) {
$variance += pow($value - $mean, 2);
}
return $variance / count($data);
}
}问题二:测试环境资源不足
解决方案:
bash
# 检查系统资源
free -h
df -h
cat /proc/cpuinfo | grep processor | wc -l
# RabbitMQ 状态检查
rabbitmqctl status
rabbitmqctl node_health_check最佳实践建议
测试环境准备
- 隔离测试环境:避免生产环境影响
- 资源充足:确保 CPU、内存、磁盘充足
- 网络稳定:使用本地或低延迟网络
- 预热系统:测试前运行预热消息
测试方法
- 多次测试取平均:消除偶然因素
- 逐步加压:从小负载开始逐步增加
- 监控资源:同时监控系统资源使用
- 记录环境:记录测试时的配置和环境
结果分析
- 关注 P99 延迟:比平均值更能反映用户体验
- 分析瓶颈:找出限制性能的因素
- 对比基准:与历史结果对比发现变化
- 持续测试:定期进行基准测试
