Skip to content

RabbitMQ 基准测试

概述

基准测试是评估 RabbitMQ 性能的重要手段,通过科学的测试方法可以准确了解系统的吞吐量、延迟和资源消耗,为性能优化提供数据支撑。

核心知识点

基准测试指标

指标说明单位
吞吐量单位时间内处理的消息数量msg/s
延迟消息从发送到被消费的时间ms
P99延迟99%消息的最大延迟ms
资源消耗CPU、内存、网络、磁盘使用率%

测试工具选择

1. RabbitMQ PerfTest

官方性能测试工具,功能强大:

bash
# 安装 PerfTest
wget https://github.com/rabbitmq/rabbitmq-perf-test/releases/download/v2.19.0/rabbitmq-perf-test-linux-bin-2.19.0.tar.gz
tar -xzf rabbitmq-perf-test-linux-bin-2.19.0.tar.gz

# 基础测试
./bin/runjava com.rabbitmq.perf.PerfTest \
  -h amqp://guest:guest@localhost:5672 \
  -t 4 -s 1024 -x 2 -y 4 -u test_queue

2. 参数说明

bash
# PerfTest 常用参数
-h, --uri          # RabbitMQ 连接URI
-t, --threads      # 工作线程数
-s, --size         # 消息大小(字节)
-x, --producers    # 生产者数量
-y, --consumers    # 消费者数量
-u, --queue        # 队列名称
-r, --rate         # 发布速率限制
-a, --autoack      # 自动确认
-p, --predeclared  # 使用已存在的队列

测试场景设计

场景一:吞吐量测试

bash
# 高吞吐量测试
./bin/runjava com.rabbitmq.perf.PerfTest \
  -h amqp://guest:guest@localhost:5672 \
  -t 8 -s 1024 -x 4 -y 8 \
  -u throughput_test \
  -p \
  --producer-scheduler-threads 4 \
  --consumers-thread-pools 4

场景二:延迟测试

bash
# 低延迟测试
./bin/runjava com.rabbitmq.perf.PerfTest \
  -h amqp://guest:guest@localhost:5672 \
  -t 2 -s 256 -x 1 -y 1 \
  -u latency_test \
  --autoack false \
  --multi-ack-every 1

场景三:持久化测试

bash
# 持久化消息测试
./bin/runjava com.rabbitmq.perf.PerfTest \
  -h amqp://guest:guest@localhost:5672 \
  -t 4 -s 1024 -x 2 -y 4 \
  -u persistent_test \
  -f persistent \
  --queue-args x-ha-policy=all

配置示例

测试环境配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 基础配置
listeners.tcp.default = 5672
management.tcp.port = 15672

# 性能相关配置
channel_max = 2048
connection_max = 65535

# 内存配置
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75

# 磁盘配置
disk_free_limit.relative = 2.0

# 队列配置
queue_master_locator = min-masters

测试脚本配置

bash
#!/bin/bash
# benchmark.sh

RABBITMQ_HOST="localhost"
RABBITMQ_PORT="5672"
RABBITMQ_USER="guest"
RABBITMQ_PASS="guest"

run_throughput_test() {
    echo "=== 吞吐量测试 ==="
    ./bin/runjava com.rabbitmq.perf.PerfTest \
        -h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
        -t 8 -s 1024 -x 4 -y 8 \
        -u throughput_queue \
        -p \
        --time 60 \
        --output-file throughput_results.json
}

run_latency_test() {
    echo "=== 延迟测试 ==="
    ./bin/runjava com.rabbitmq.perf.PerfTest \
        -h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
        -t 2 -s 256 -x 1 -y 1 \
        -u latency_queue \
        --autoack false \
        --time 60 \
        --output-file latency_results.json
}

run_persistence_test() {
    echo "=== 持久化测试 ==="
    ./bin/runjava com.rabbitmq.perf.PerfTest \
        -h amqp://${RABBITMQ_USER}:${RABBITMQ_PASS}@${RABBITMQ_HOST}:${RABBITMQ_PORT} \
        -t 4 -s 1024 -x 2 -y 4 \
        -u persistent_queue \
        -f persistent \
        --time 60 \
        --output-file persistence_results.json
}

case "$1" in
    throughput) run_throughput_test ;;
    latency) run_latency_test ;;
    persistence) run_persistence_test ;;
    all)
        run_throughput_test
        run_latency_test
        run_persistence_test
        ;;
    *) echo "Usage: $0 {throughput|latency|persistence|all}" ;;
esac

PHP 代码示例

性能测试类

php
<?php

namespace App\RabbitMQ\Benchmark;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AMQPChannel;

class RabbitMQBenchmark
{
    private AMQPStreamConnection $connection;
    private AMQPChannel $channel;
    private array $results = [];

    public function __construct(string $host, int $port, string $user, string $pass)
    {
        $this->connection = new AMQPStreamConnection(
            $host,
            $port,
            $user,
            $pass,
            '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            3.0,
            null,
            true,
            60
        );
        $this->channel = $this->connection->channel();
    }

    public function runThroughputTest(
        string $queueName,
        int $messageCount,
        int $messageSize,
        int $producers = 1,
        int $consumers = 1
    ): array {
        $this->declareQueue($queueName);
        
        $message = str_repeat('x', $messageSize);
        
        $startTime = microtime(true);
        
        $producerResults = $this->runProducers(
            $queueName,
            $message,
            $messageCount,
            $producers
        );
        
        $consumerResults = $this->runConsumers(
            $queueName,
            $messageCount,
            $consumers
        );
        
        $endTime = microtime(true);
        $totalTime = $endTime - $startTime;
        
        return [
            'total_messages' => $messageCount,
            'total_time' => round($totalTime, 3),
            'throughput' => round($messageCount / $totalTime, 2),
            'message_size' => $messageSize,
            'producers' => $producers,
            'consumers' => $consumers,
            'producer_stats' => $producerResults,
            'consumer_stats' => $consumerResults,
        ];
    }

    public function runLatencyTest(
        string $queueName,
        int $messageCount,
        int $messageSize
    ): array {
        $this->declareQueue($queueName);
        
        $latencies = [];
        $message = str_repeat('x', $messageSize);
        
        for ($i = 0; $i < $messageCount; $i++) {
            $sendTime = microtime(true);
            
            $msg = new AMQPMessage(
                json_encode([
                    'payload' => $message,
                    'send_time' => $sendTime,
                ]),
                ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]
            );
            
            $this->channel->basic_publish($msg, '', $queueName);
            
            $this->channel->basic_consume($queueName, '', false, true, false, false, 
                function ($msg) use (&$latencies) {
                    $data = json_decode($msg->body, true);
                    $latency = (microtime(true) - $data['send_time']) * 1000;
                    $latencies[] = $latency;
                }
            );
            
            $this->channel->wait();
        }
        
        sort($latencies);
        
        return [
            'message_count' => $messageCount,
            'avg_latency' => round(array_sum($latencies) / count($latencies), 2),
            'min_latency' => round(min($latencies), 2),
            'max_latency' => round(max($latencies), 2),
            'p50_latency' => round($this->percentile($latencies, 50), 2),
            'p95_latency' => round($this->percentile($latencies, 95), 2),
            'p99_latency' => round($this->percentile($latencies, 99), 2),
        ];
    }

    public function runPersistenceTest(
        string $queueName,
        int $messageCount,
        int $messageSize
    ): array {
        $this->declareQueue($queueName, true);
        
        $message = str_repeat('x', $messageSize);
        $startTime = microtime(true);
        
        for ($i = 0; $i < $messageCount; $i++) {
            $msg = new AMQPMessage(
                $message,
                ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
            );
            
            $this->channel->basic_publish($msg, '', $queueName);
        }
        
        $publishTime = microtime(true) - $startTime;
        
        $consumeStart = microtime(true);
        $consumed = 0;
        
        $this->channel->basic_consume($queueName, '', false, false, false, false,
            function ($msg) use (&$consumed) {
                $consumed++;
                $msg->ack();
            }
        );
        
        while ($consumed < $messageCount) {
            $this->channel->wait();
        }
        
        $consumeTime = microtime(true) - $consumeStart;
        $totalTime = microtime(true) - $startTime;
        
        return [
            'message_count' => $messageCount,
            'message_size' => $messageSize,
            'publish_time' => round($publishTime, 3),
            'consume_time' => round($consumeTime, 3),
            'total_time' => round($totalTime, 3),
            'publish_rate' => round($messageCount / $publishTime, 2),
            'consume_rate' => round($messageCount / $consumeTime, 2),
        ];
    }

    private function declareQueue(string $queueName, bool $durable = false): void
    {
        $this->channel->queue_declare(
            $queueName,
            false,
            $durable,
            false,
            false
        );
    }

    private function runProducers(
        string $queueName,
        string $message,
        int $count,
        int $producers
    ): array {
        $messagesPerProducer = (int) ($count / $producers);
        $results = [];
        
        for ($p = 0; $p < $producers; $p++) {
            $startTime = microtime(true);
            
            for ($i = 0; $i < $messagesPerProducer; $i++) {
                $msg = new AMQPMessage($message);
                $this->channel->basic_publish($msg, '', $queueName);
            }
            
            $results[] = [
                'producer_id' => $p,
                'messages' => $messagesPerProducer,
                'time' => round(microtime(true) - $startTime, 3),
            ];
        }
        
        return $results;
    }

    private function runConsumers(
        string $queueName,
        int $expectedCount,
        int $consumers
    ): array {
        $results = [];
        $totalConsumed = 0;
        $consumerCounts = array_fill(0, $consumers, 0);
        
        $callback = function ($msg, $consumerId) use (&$consumerCounts, &$totalConsumed) {
            $consumerCounts[$consumerId]++;
            $totalConsumed++;
            $msg->ack();
        };
        
        $startTimes = [];
        for ($c = 0; $c < $consumers; $c++) {
            $startTimes[$c] = microtime(true);
            $consumerTag = $this->channel->basic_consume(
                $queueName,
                '',
                false,
                false,
                false,
                false,
                function ($msg) use ($callback, $c) {
                    $callback($msg, $c);
                }
            );
        }
        
        while ($totalConsumed < $expectedCount) {
            $this->channel->wait();
        }
        
        for ($c = 0; $c < $consumers; $c++) {
            $results[] = [
                'consumer_id' => $c,
                'messages' => $consumerCounts[$c],
                'time' => round(microtime(true) - $startTimes[$c], 3),
            ];
        }
        
        return $results;
    }

    private function percentile(array $data, float $percentile): float
    {
        $index = (count($data) - 1) * ($percentile / 100);
        $lower = floor($index);
        $upper = ceil($index);
        
        if ($lower === $upper) {
            return $data[$index];
        }
        
        return $data[$lower] + ($data[$upper] - $data[$lower]) * ($index - $lower);
    }

    public function getSystemMetrics(): array
    {
        return [
            'memory_usage' => memory_get_usage(true),
            'memory_peak' => memory_get_peak_usage(true),
            'connection_stats' => $this->connection->getServerProperties(),
        ];
    }

    public function __destruct()
    {
        if ($this->channel->is_open()) {
            $this->channel->close();
        }
        if ($this->connection->isConnected()) {
            $this->connection->close();
        }
    }
}

测试执行脚本

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use App\RabbitMQ\Benchmark\RabbitMQBenchmark;

$benchmark = new RabbitMQBenchmark(
    'localhost',
    5672,
    'guest',
    'guest'
);

echo "=== RabbitMQ 性能基准测试 ===\n\n";

echo "1. 吞吐量测试\n";
$throughputResults = $benchmark->runThroughputTest(
    'benchmark_throughput',
    10000,
    1024,
    2,
    4
);
print_r($throughputResults);

echo "\n2. 延迟测试\n";
$latencyResults = $benchmark->runLatencyTest(
    'benchmark_latency',
    1000,
    256
);
print_r($latencyResults);

echo "\n3. 持久化测试\n";
$persistenceResults = $benchmark->runPersistenceTest(
    'benchmark_persistent',
    5000,
    1024
);
print_r($persistenceResults);

echo "\n4. 系统指标\n";
$systemMetrics = $benchmark->getSystemMetrics();
print_r($systemMetrics);

结果分析类

php
<?php

namespace App\RabbitMQ\Benchmark;

class BenchmarkAnalyzer
{
    public function analyzeResults(array $results): array
    {
        $analysis = [];
        
        foreach ($results as $testName => $data) {
            $analysis[$testName] = [
                'score' => $this->calculateScore($data),
                'bottleneck' => $this->identifyBottleneck($data),
                'recommendations' => $this->getRecommendations($data),
            ];
        }
        
        return $analysis;
    }

    private function calculateScore(array $data): string
    {
        if (isset($data['throughput'])) {
            if ($data['throughput'] > 50000) return '优秀';
            if ($data['throughput'] > 20000) return '良好';
            if ($data['throughput'] > 10000) return '一般';
            return '需优化';
        }
        
        if (isset($data['avg_latency'])) {
            if ($data['avg_latency'] < 1) return '优秀';
            if ($data['avg_latency'] < 5) return '良好';
            if ($data['avg_latency'] < 20) return '一般';
            return '需优化';
        }
        
        return '未知';
    }

    private function identifyBottleneck(array $data): string
    {
        if (isset($data['p99_latency']) && $data['p99_latency'] > $data['avg_latency'] * 3) {
            return '延迟波动大,可能存在GC或网络问题';
        }
        
        if (isset($data['producer_stats']) && isset($data['consumer_stats'])) {
            $producerRate = $this->calculateAverageRate($data['producer_stats']);
            $consumerRate = $this->calculateAverageRate($data['consumer_stats']);
            
            if ($producerRate > $consumerRate * 1.5) {
                return '消费者处理能力不足';
            }
            if ($consumerRate > $producerRate * 1.5) {
                return '生产者发送能力不足';
            }
        }
        
        return '未发现明显瓶颈';
    }

    private function getRecommendations(array $data): array
    {
        $recommendations = [];
        
        if (isset($data['throughput']) && $data['throughput'] < 20000) {
            $recommendations[] = '增加消费者数量';
            $recommendations[] = '使用批量确认';
            $recommendations[] = '调整预取计数';
        }
        
        if (isset($data['p99_latency']) && $data['p99_latency'] > 10) {
            $recommendations[] = '减少消息大小';
            $recommendations[] = '使用非持久化消息(如允许)';
            $recommendations[] = '优化网络配置';
        }
        
        return $recommendations;
    }

    private function calculateAverageRate(array $stats): float
    {
        $totalRate = 0;
        foreach ($stats as $stat) {
            $totalRate += $stat['messages'] / $stat['time'];
        }
        return $totalRate / count($stats);
    }
}

实际应用场景

场景一:新系统上线前压测

php
<?php

class PreLaunchBenchmark
{
    private RabbitMQBenchmark $benchmark;
    
    public function runFullBenchmark(): array
    {
        $results = [];
        
        $results['light_load'] = $this->benchmark->runThroughputTest(
            'pre_launch_light',
            1000,
            512,
            1,
            1
        );
        
        $results['normal_load'] = $this->benchmark->runThroughputTest(
            'pre_launch_normal',
            10000,
            1024,
            2,
            4
        );
        
        $results['peak_load'] = $this->benchmark->runThroughputTest(
            'pre_launch_peak',
            50000,
            1024,
            4,
            8
        );
        
        return $this->generateReport($results);
    }

    private function generateReport(array $results): array
    {
        return [
            'timestamp' => date('Y-m-d H:i:s'),
            'results' => $results,
            'summary' => [
                'max_throughput' => max(array_column($results, 'throughput')),
                'min_latency' => min(array_column($results, 'avg_latency') ?: [0]),
            ],
            'passed' => $this->evaluateResults($results),
        ];
    }

    private function evaluateResults(array $results): bool
    {
        $peakThroughput = $results['peak_load']['throughput'] ?? 0;
        return $peakThroughput >= 20000;
    }
}

场景二:性能对比测试

php
<?php

class ComparisonBenchmark
{
    public function compareConfigurations(array $configs): array
    {
        $results = [];
        
        foreach ($configs as $name => $config) {
            $benchmark = new RabbitMQBenchmark(
                $config['host'],
                $config['port'],
                $config['user'],
                $config['pass']
            );
            
            $results[$name] = $benchmark->runThroughputTest(
                "compare_{$name}",
                10000,
                1024,
                2,
                4
            );
        }
        
        return $this->compareResults($results);
    }

    private function compareResults(array $results): array
    {
        $comparison = [];
        $baseline = array_key_first($results);
        
        foreach ($results as $name => $data) {
            $comparison[$name] = [
                'throughput' => $data['throughput'],
                'relative_to_baseline' => $name === $baseline 
                    ? '100%' 
                    : round(($data['throughput'] / $results[$baseline]['throughput']) * 100, 1) . '%',
            ];
        }
        
        return $comparison;
    }
}

常见问题与解决方案

问题一:测试结果不稳定

原因分析

  • 系统资源竞争
  • 网络波动
  • GC 停顿

解决方案

php
<?php

class StableBenchmark
{
    public function runWithWarmup(
        string $queueName,
        int $warmupMessages,
        int $testMessages
    ): array {
        $benchmark = new RabbitMQBenchmark('localhost', 5672, 'guest', 'guest');
        
        echo "预热阶段...\n";
        $benchmark->runThroughputTest($queueName, $warmupMessages, 1024, 1, 1);
        
        echo "正式测试...\n";
        $results = [];
        for ($i = 0; $i < 3; $i++) {
            $results[] = $benchmark->runThroughputTest(
                $queueName . "_{$i}",
                $testMessages,
                1024,
                2,
                4
            );
        }
        
        return $this->calculateStableResult($results);
    }

    private function calculateStableResult(array $results): array
    {
        $throughputs = array_column($results, 'throughput');
        sort($throughputs);
        
        $trimmed = array_slice($throughputs, 1, -1);
        
        return [
            'avg_throughput' => array_sum($trimmed) / count($trimmed),
            'min_throughput' => min($trimmed),
            'max_throughput' => max($trimmed),
            'variance' => $this->calculateVariance($trimmed),
        ];
    }

    private function calculateVariance(array $data): float
    {
        $mean = array_sum($data) / count($data);
        $variance = 0;
        foreach ($data as $value) {
            $variance += pow($value - $mean, 2);
        }
        return $variance / count($data);
    }
}

问题二:测试环境资源不足

解决方案

bash
# 检查系统资源
free -h
df -h
cat /proc/cpuinfo | grep processor | wc -l

# RabbitMQ 状态检查
rabbitmqctl status
rabbitmqctl node_health_check

最佳实践建议

测试环境准备

  1. 隔离测试环境:避免生产环境影响
  2. 资源充足:确保 CPU、内存、磁盘充足
  3. 网络稳定:使用本地或低延迟网络
  4. 预热系统:测试前运行预热消息

测试方法

  1. 多次测试取平均:消除偶然因素
  2. 逐步加压:从小负载开始逐步增加
  3. 监控资源:同时监控系统资源使用
  4. 记录环境:记录测试时的配置和环境

结果分析

  1. 关注 P99 延迟:比平均值更能反映用户体验
  2. 分析瓶颈:找出限制性能的因素
  3. 对比基准:与历史结果对比发现变化
  4. 持续测试:定期进行基准测试

相关链接