Skip to content

RabbitMQ Erlang VM 调优

概述

RabbitMQ 基于 Erlang/OTP 构建,Erlang 虚拟机(BEAM)的调优对 RabbitMQ 性能至关重要。通过合理配置调度器、内存分配器和 GC 参数,可以显著提升 RabbitMQ 的吞吐量和响应速度。

核心知识点

Erlang VM 架构

┌─────────────────────────────────────────────────────────┐
│                    Erlang VM (BEAM)                      │
├─────────────────────────────────────────────────────────┤
│  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐       │
│  │Scheduler│ │Scheduler│ │Scheduler│ │Scheduler│  ...   │
│  │   +I    │ │   +I    │ │  (普通) │ │  (普通) │       │
│  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘       │
│       │           │           │           │             │
│  ┌────▼───────────▼───────────▼───────────▼────┐       │
│  │              Run Queue (运行队列)             │       │
│  └─────────────────────────────────────────────┘       │
│                                                         │
│  ┌─────────────────────────────────────────────┐       │
│  │           Memory Allocators (内存分配器)      │       │
│  │  ┌────────┐ ┌────────┐ ┌────────┐          │       │
│  │  │binary  │ │  heap  │ │  ets   │  ...     │       │
│  │  └────────┘ └────────┘ └────────┘          │       │
│  └─────────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────────┘

调度器配置

1. 调度器类型

类型说明适用场景
普通调度器执行 Erlang 进程代码CPU 密集型任务
脏调度器执行原生代码/IOIO 密集型任务
CPU 调度器绑定到特定 CPU高性能场景

2. 调度器数量

bash
# 查看当前调度器配置
rabbitmqctl eval 'erlang:system_info(schedulers).'
rabbitmqctl eval 'erlang:system_info(schedulers_online).'

# 推荐配置
# 调度器数量 = CPU 核心数
# 在线调度器 = CPU 核心数

3. 调度器绑定

bash
# 查看绑定类型
rabbitmqctl eval 'erlang:system_info(scheduler_bind_type).'

# 绑定类型说明
# no_bind: 不绑定(默认)
# processor: 绑定到处理器
# node: 绑定到 NUMA 节点
# thread: 绑定到线程

内存分配器

1. 分配器类型

bash
# 查看分配器信息
rabbitmqctl eval 'erlang:system_info(allocator).'
分配器用途调优重点
temp_alloc临时分配通常不需调优
sl_alloc短生命周期对象较少关注
std_alloc通用分配核心调优对象
proc_alloc进程堆核心调优对象
binary_alloc二进制数据大消息场景重要
ets_allocETS 表大量队列时重要

2. 分配器策略

策略说明适用场景
aoffcbf地址顺序首次适配默认,通用
ageffcbf年龄优先首次适配减少碎片
bestfit最佳适配内存紧张场景
address地址顺序特定场景

垃圾回收(GC)

1. GC 类型

类型触发条件特点
分代 GC堆增长轻量级
完整 GC手动/内存压力重量级

2. GC 参数

erlang
% 查看默认 GC 参数
rabbitmqctl eval 'erlang:system_info(fullsweep_after).'
% 默认值: 65535

% 堆增长策略
% 默认: 堆大小翻倍

配置示例

环境变量配置

bash
# /etc/rabbitmq/rabbitmq-env.conf

# 基础配置
RABBITMQ_NODENAME=rabbit@localhost
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0

# Erlang VM 参数
RABBITMQ_SERVER_ERL_ARGS="+K true +A 128 +P 1048576 \
  +stbt db +zdbbl 128000 \
  +sbwt none +swt very_low +swct priority"

# 高负载场景参数
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 8:8 +SDcpu 8:8 \
  +MBas ageffcbf +MHas ageffcbf +MSs ageffcbf \
  +MMBcs 512 +MMBac 512 +MMscs 512"

完整配置文件

bash
#!/bin/bash
# /etc/rabbitmq/rabbitmq-env.conf

# 节点配置
RABBITMQ_NODENAME=rabbit@$(hostname -s)
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=5672

# Erlang VM 核心参数
RABBITMQ_SERVER_ERL_ARGS="\
+K true \
+A 128 \
+P 1048576 \
+Q 1048576 \
+stbt db \
+zdbbl 128000 \
+sbwt none \
+swt very_low \
+swct priority"

# 调度器配置
# +S TotalSchedulers:OnlineSchedulers
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="\
+S 16:16 \
+SDcpu 16:16 \
+SDio 8"

# 内存分配器配置
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS \
+MBas ageffcbf \
+MHas ageffcbf \
+MSs ageffcbf \
+MBac ageffcbf \
+MHac ageffcbf \
+MSac ageffcbf \
+MMBcs 512 \
+MMBac 512 \
+MMscs 512 \
+MMbac 512"

# GC 配置
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS \
+het 65536 \
+hei 65536"

# 日志配置
RABBITMQ_LOG_BASE=/var/log/rabbitmq
RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia

参数详解

bash
# +K true
# 启用内核轮询,提高网络性能

# +A 128
# 异步线程池大小,用于文件 I/O
# 推荐: CPU 核心数 * 8 ~ 128

# +P 1048576
# 最大进程数(包括连接和通道)
# 推荐: 预期最大连接数 * 10

# +Q 1048576
# 最大端口数

# +stbt db
# 调度器绑定类型: db = 默认绑定

# +zdbbl 128000
# 分布式缓冲区限制(KB)
# 集群场景重要

# +sbwt none
# 调度器忙等待阈值

# +swt very_low
# 调度器唤醒阈值

# +swct priority
# 调度器唤醒检查类型

# +S 16:16
# 调度器总数:在线调度器数

# +SDcpu 16:16
# CPU 脏调度器配置

# +SDio 8
# IO 脏调度器数量

# +MBas ageffcbf
# Binary 分配器策略

# +MHas ageffcbf
# Heap 分配器策略

# +MMBcs 512
# Binary 分配器载体大小(MB)

# +het 65536
% 堆扩展阈值

PHP 代码示例

VM 状态监控类

php
<?php

namespace App\RabbitMQ\Monitoring;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class ErlangVMMonitor
{
    private $connection;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
    }
    
    public function getSchedulerInfo(): array
    {
        $commands = [
            'schedulers' => 'erlang:system_info(schedulers).',
            'schedulers_online' => 'erlang:system_info(schedulers_online).',
            'scheduler_bind_type' => 'erlang:system_info(scheduler_bind_type).',
            'dirty_cpu_schedulers' => 'erlang:system_info(dirty_cpu_schedulers).',
            'dirty_io_schedulers' => 'erlang:system_info(dirty_io_schedulers).',
        ];
        
        $results = [];
        foreach ($commands as $key => $command) {
            $results[$key] = $this->executeErlangCommand($command);
        }
        
        return $results;
    }

    public function getMemoryInfo(): array
    {
        $commands = [
            'total_memory' => 'erlang:memory(total).',
            'processes_memory' => 'erlang:memory(processes).',
            'processes_used' => 'erlang:memory(processes_used).',
            'binary_memory' => 'erlang:memory(binary).',
            'ets_memory' => 'erlang:memory(ets).',
            'atom_memory' => 'erlang:memory(atom).',
            'code_memory' => 'erlang:memory(code).',
        ];
        
        $results = [];
        foreach ($commands as $key => $command) {
            $results[$key] = $this->parseMemoryValue(
                $this->executeErlangCommand($command)
            );
        }
        
        return $results;
    }

    public function getProcessInfo(): array
    {
        $commands = [
            'process_count' => 'erlang:system_info(process_count).',
            'process_limit' => 'erlang:system_info(process_limit).',
            'port_count' => 'erlang:system_info(port_count).',
            'port_limit' => 'erlang:system_info(port_limit).',
        ];
        
        $results = [];
        foreach ($commands as $key => $command) {
            $results[$key] = $this->executeErlangCommand($command);
        }
        
        $results['process_usage_percent'] = round(
            $results['process_count'] / $results['process_limit'] * 100,
            2
        );
        
        return $results;
    }

    public function getGCStats(): array
    {
        $commands = [
            'fullsweep_after' => 'erlang:system_info(fullsweep_after).',
            'garbage_collection' => 'erlang:system_info(garbage_collection).',
        ];
        
        $results = [];
        foreach ($commands as $key => $command) {
            $results[$key] = $this->executeErlangCommand($command);
        }
        
        return $results;
    }

    public function getAllocatorInfo(): array
    {
        $command = 'erlang:system_info(allocator).';
        $result = $this->executeErlangCommand($command);
        
        return $this->parseAllocatorOutput($result);
    }

    public function getRunQueueStats(): array
    {
        $commands = [
            'run_queue' => 'erlang:statistics(run_queue).',
            'total_run_queue_lengths' => 'erlang:statistics(total_run_queue_lengths).',
            'normal_run_queue_lengths' => 'erlang:statistics(normal_run_queue_lengths).',
            'dirty_cpu_run_queue_lengths' => 'erlang:statistics(dirty_cpu_run_queue_lengths).',
            'dirty_io_run_queue_lengths' => 'erlang:statistics(dirty_io_run_queue_lengths).',
        ];
        
        $results = [];
        foreach ($commands as $key => $command) {
            $results[$key] = $this->executeErlangCommand($command);
        }
        
        return $results;
    }

    public function getVMHealthScore(): array
    {
        $memory = $this->getMemoryInfo();
        $processes = $this->getProcessInfo();
        $runQueue = $this->getRunQueueStats();
        
        $score = 100;
        $issues = [];
        
        if ($processes['process_usage_percent'] > 80) {
            $score -= 20;
            $issues[] = '进程数量接近上限';
        }
        
        if ($runQueue['run_queue'] > 100) {
            $score -= 15;
            $issues[] = '运行队列过长';
        }
        
        $binaryPercent = $memory['binary_memory'] / $memory['total_memory'] * 100;
        if ($binaryPercent > 50) {
            $score -= 10;
            $issues[] = '二进制内存占比过高';
        }
        
        return [
            'score' => max(0, $score),
            'status' => $score >= 80 ? 'healthy' : ($score >= 50 ? 'warning' : 'critical'),
            'issues' => $issues,
        ];
    }

    public function getDetailedReport(): array
    {
        return [
            'timestamp' => date('Y-m-d H:i:s'),
            'scheduler' => $this->getSchedulerInfo(),
            'memory' => $this->getMemoryInfo(),
            'processes' => $this->getProcessInfo(),
            'gc' => $this->getGCStats(),
            'run_queue' => $this->getRunQueueStats(),
            'health' => $this->getVMHealthScore(),
        ];
    }

    private function executeErlangCommand(string $command): string
    {
        $escapedCommand = escapeshellarg($command);
        $output = shell_exec("rabbitmqctl eval {$escapedCommand} 2>/dev/null");
        return trim($output ?? '');
    }

    private function parseMemoryValue(string $value): int
    {
        if (preg_match('/(\d+)/', $value, $matches)) {
            return (int) $matches[1];
        }
        return 0;
    }

    private function parseAllocatorOutput(string $output): array
    {
        $allocators = [];
        
        $types = ['temp_alloc', 'sl_alloc', 'std_alloc', 'proc_alloc', 'binary_alloc', 'ets_alloc'];
        
        foreach ($types as $type) {
            if (strpos($output, $type) !== false) {
                $allocators[$type] = [
                    'enabled' => true,
                ];
            }
        }
        
        return $allocators;
    }
}

VM 调优建议生成器

php
<?php

namespace App\RabbitMQ\Monitoring;

class ErlangVMTuningAdvisor
{
    private ErlangVMMonitor $monitor;
    
    public function __construct(ErlangVMMonitor $monitor)
    {
        $this->monitor = $monitor;
    }
    
    public function analyze(): array
    {
        $scheduler = $this->monitor->getSchedulerInfo();
        $memory = $this->monitor->getMemoryInfo();
        $processes = $this->monitor->getProcessInfo();
        
        $recommendations = [];
        
        $recommendations['scheduler'] = $this->analyzeScheduler($scheduler);
        $recommendations['memory'] = $this->analyzeMemory($memory);
        $recommendations['processes'] = $this->analyzeProcesses($processes);
        
        return [
            'current_state' => [
                'scheduler' => $scheduler,
                'memory' => $memory,
                'processes' => $processes,
            ],
            'recommendations' => $recommendations,
            'config_changes' => $this->generateConfigChanges($recommendations),
        ];
    }

    private function analyzeScheduler(array $scheduler): array
    {
        $recommendations = [];
        
        $cpuCores = $this->getCpuCores();
        $currentSchedulers = (int) $scheduler['schedulers'];
        
        if ($currentSchedulers < $cpuCores) {
            $recommendations[] = [
                'type' => 'scheduler_count',
                'current' => $currentSchedulers,
                'recommended' => $cpuCores,
                'reason' => '调度器数量应等于 CPU 核心数',
                'parameter' => "+S {$cpuCores}:{$cpuCores}",
            ];
        }
        
        if ($scheduler['scheduler_bind_type'] === 'no_bind') {
            $recommendations[] = [
                'type' => 'scheduler_binding',
                'current' => 'no_bind',
                'recommended' => 'db (default bind)',
                'reason' => '启用调度器绑定可提高性能',
                'parameter' => '+stbt db',
            ];
        }
        
        return $recommendations;
    }

    private function analyzeMemory(array $memory): array
    {
        $recommendations = [];
        
        $binaryPercent = $memory['binary_memory'] / $memory['total_memory'] * 100;
        
        if ($binaryPercent > 40) {
            $recommendations[] = [
                'type' => 'binary_allocator',
                'current_percent' => round($binaryPercent, 2),
                'recommended' => '优化 binary 分配器',
                'reason' => '二进制数据占用内存过高',
                'parameter' => '+MBas ageffcbf +MMBcs 512',
            ];
        }
        
        $etsPercent = $memory['ets_memory'] / $memory['total_memory'] * 100;
        if ($etsPercent > 30) {
            $recommendations[] = [
                'type' => 'ets_allocator',
                'current_percent' => round($etsPercent, 2),
                'recommended' => '优化 ETS 分配器',
                'reason' => 'ETS 表占用内存过高',
                'parameter' => '+MSs ageffcbf +MMscs 512',
            ];
        }
        
        return $recommendations;
    }

    private function analyzeProcesses(array $processes): array
    {
        $recommendations = [];
        
        $usagePercent = $processes['process_usage_percent'];
        
        if ($usagePercent > 70) {
            $recommendedLimit = (int) ($processes['process_limit'] * 1.5);
            $recommendations[] = [
                'type' => 'process_limit',
                'current' => $processes['process_limit'],
                'current_usage' => $usagePercent . '%',
                'recommended' => $recommendedLimit,
                'reason' => '进程数量使用率过高',
                'parameter' => "+P {$recommendedLimit}",
            ];
        }
        
        return $recommendations;
    }

    private function generateConfigChanges(array $recommendations): array
    {
        $changes = [];
        
        foreach ($recommendations as $category => $items) {
            foreach ($items as $item) {
                if (isset($item['parameter'])) {
                    $changes[] = [
                        'category' => $category,
                        'parameter' => $item['parameter'],
                        'reason' => $item['reason'],
                    ];
                }
            }
        }
        
        return $changes;
    }

    private function getCpuCores(): int
    {
        if (is_file('/proc/cpuinfo')) {
            return substr_count(file_get_contents('/proc/cpuinfo'), 'processor');
        }
        return 1;
    }
}

性能测试脚本

php
<?php

namespace App\RabbitMQ\Performance;

use App\RabbitMQ\Monitoring\ErlangVMMonitor;

class VMBenchmark
{
    private ErlangVMMonitor $monitor;
    
    public function __construct(ErlangVMMonitor $monitor)
    {
        $this->monitor = $monitor;
    }
    
    public function benchmarkSchedulerPerformance(int $duration = 10): array
    {
        $samples = [];
        $interval = 100000;
        $iterations = $duration * 10;
        
        for ($i = 0; $i < $iterations; $i++) {
            $runQueue = $this->monitor->getRunQueueStats();
            $samples[] = [
                'timestamp' => microtime(true),
                'run_queue' => $runQueue['run_queue'],
            ];
            usleep($interval);
        }
        
        return $this->analyzeSchedulerSamples($samples);
    }

    public function benchmarkMemoryGrowth(int $duration = 60): array
    {
        $samples = [];
        $interval = 1000000;
        $iterations = $duration;
        
        $initialMemory = $this->monitor->getMemoryInfo();
        
        for ($i = 0; $i < $iterations; $i++) {
            $memory = $this->monitor->getMemoryInfo();
            $samples[] = [
                'timestamp' => microtime(true),
                'total' => $memory['total_memory'],
                'binary' => $memory['binary_memory'],
                'processes' => $memory['processes_memory'],
            ];
            sleep(1);
        }
        
        return [
            'initial' => $initialMemory,
            'final' => $this->monitor->getMemoryInfo(),
            'growth_rate' => $this->calculateGrowthRate($samples),
            'samples' => $samples,
        ];
    }

    private function analyzeSchedulerSamples(array $samples): array
    {
        $runQueues = array_column($samples, 'run_queue');
        
        return [
            'avg_run_queue' => array_sum($runQueues) / count($runQueues),
            'max_run_queue' => max($runQueues),
            'min_run_queue' => min($runQueues),
            'variance' => $this->calculateVariance($runQueues),
            'samples_count' => count($samples),
        ];
    }

    private function calculateGrowthRate(array $samples): array
    {
        if (count($samples) < 2) {
            return ['total' => 0, 'binary' => 0, 'processes' => 0];
        }
        
        $first = $samples[0];
        $last = $samples[count($samples) - 1];
        $timeDiff = $last['timestamp'] - $first['timestamp'];
        
        return [
            'total' => ($last['total'] - $first['total']) / $timeDiff,
            'binary' => ($last['binary'] - $first['binary']) / $timeDiff,
            'processes' => ($last['processes'] - $first['processes']) / $timeDiff,
        ];
    }

    private function calculateVariance(array $data): float
    {
        $mean = array_sum($data) / count($data);
        $variance = 0;
        foreach ($data as $value) {
            $variance += pow($value - $mean, 2);
        }
        return $variance / count($data);
    }
}

实际应用场景

场景一:高并发连接优化

php
<?php

class HighConcurrencyVMTuning
{
    public function generateConfig(int $expectedConnections): array
    {
        $processLimit = $expectedConnections * 10;
        $portLimit = $expectedConnections * 2;
        
        return [
            'RABBITMQ_SERVER_ERL_ARGS' => [
                "+P {$processLimit}",
                "+Q {$portLimit}",
                '+K true',
                '+A 128',
            ],
            'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS' => [
                '+stbt db',
                '+sbwt none',
                '+swt very_low',
            ],
            'description' => "针对 {$expectedConnections} 连接的优化配置",
        ];
    }
}

场景二:大消息处理优化

php
<?php

class LargeMessageVMTuning
{
    public function generateConfig(): array
    {
        return [
            'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS' => [
                '+MBas ageffcbf',
                '+MBac ageffcbf',
                '+MMBcs 1024',
                '+MMBac 1024',
                '+MMbac 512',
            ],
            'description' => '针对大消息处理的内存分配器优化',
            'notes' => [
                '增加 binary 分配器载体大小',
                '使用 ageffcbf 策略减少碎片',
                '适用于消息大小 > 64KB 的场景',
            ],
        ];
    }
}

场景三:大量队列优化

php
<?php

class ManyQueuesVMTuning
{
    public function generateConfig(int $queueCount): array
    {
        return [
            'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS' => [
                '+MSs ageffcbf',
                '+MSac ageffcbf',
                '+MMscs 512',
                '+MMac 512',
                '+MScs 1024',
            ],
            'rabbitmq_conf' => [
                'queue_master_locator = min-masters',
            ],
            'description' => "针对 {$queueCount} 队列的优化配置",
            'notes' => [
                '优化 ETS 分配器',
                '大量队列会占用大量 ETS 表',
            ],
        ];
    }
}

常见问题与解决方案

问题一:调度器负载不均

诊断

bash
# 查看调度器使用情况
rabbitmqctl eval 'erlang:statistics(scheduler_wall_time).'

解决

bash
# 启用调度器绑定
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+stbt db"

问题二:内存碎片严重

诊断

bash
# 查看内存碎片
rabbitmqctl eval 'erlang:memory().'
rabbitmqctl eval 'erlang:system_info(allocator).'

解决

bash
# 使用 ageffcbf 策略
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+MBas ageffcbf +MHas ageffcbf +MSs ageffcbf"

问题三:GC 频繁

诊断

bash
# 查看 GC 统计
rabbitmqctl eval 'erlang:statistics(garbage_collection).'

解决

bash
# 调整 GC 参数
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+het 65536 +hei 65536"

最佳实践建议

调度器配置

场景配置建议
CPU 密集型调度器数 = CPU 核心数
IO 密集型增加脏调度器
混合型默认配置 + 绑定

内存分配器

场景重点优化
大消息binary_alloc
大量队列ets_alloc
大量进程proc_alloc

监控指标

指标告警阈值
运行队列长度> 100
进程使用率> 80%
二进制内存占比> 50%

相关链接