Appearance
RabbitMQ 内存使用分析
概述
理解 RabbitMQ 的内存使用模式对于系统调优和故障排查至关重要。本文将深入分析 RabbitMQ 的内存组成、监控方法和优化策略。
核心知识点
内存组成结构
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ 内存组成 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Total Memory │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Processes Memory │ │ │
│ │ │ (Erlang 进程堆、栈、邮箱) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Binary Memory │ │ │
│ │ │ (消息体、大块数据) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ ETS Memory │ │ │
│ │ │ (队列索引、路由表、Mnesia) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Code Memory │ │ │
│ │ │ (Erlang 代码段) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Atom Memory │ │ │
│ │ │ (原子表) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘内存类型详解
| 内存类型 | 说明 | 占比 | 优化重点 |
|---|---|---|---|
| processes | Erlang 进程内存 | 20-40% | 进程数量、堆大小 |
| binary | 二进制数据 | 30-60% | 消息大小、数量 |
| ets | ETS 表内存 | 10-30% | 队列数量、索引 |
| code | 代码段 | 5-10% | 通常固定 |
| atom | 原子表 | <1% | 通常固定 |
内存分配器
┌─────────────────────────────────────────────────────────────┐
│ 内存分配器架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 应用请求 ──▶ ┌─────────────────────────────────────────┐ │
│ │ Allocator │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ Carrier │ │ │
│ │ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │
│ │ │ │Block│ │Block│ │Block│ ... │ │ │
│ │ │ └─────┘ └─────┘ └─────┘ │ │ │
│ │ └─────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ Carrier │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ │
│ Carrier: 从操作系统分配的大块内存 │
│ Block: 分配给应用的实际内存块 │
│ │
└─────────────────────────────────────────────────────────────┘队列内存模型
┌─────────────────────────────────────────────────────────────┐
│ 队列内存占用 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 普通队列: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 消息索引 (ETS) + 消息体 (Binary/磁盘) │ │
│ │ ~100 bytes/消息 + 消息大小 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 懒队列: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 消息索引 (ETS) + 消息体 (磁盘) │ │
│ │ ~100 bytes/消息 + 极少内存 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 内存估算: │
│ - 每条消息索引:~100-200 bytes │
│ - 每个队列基础:~10-50 KB │
│ - 每个连接:~100 KB │
│ - 每个通道:~50 KB │
│ │
└─────────────────────────────────────────────────────────────┘配置示例
内存监控配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 内存水位线(相对值)
vm_memory_high_watermark.relative = 0.6
# 内存水位线(绝对值)
# vm_memory_high_watermark.absolute = 4GB
# 分页阈值比例
vm_memory_high_watermark_paging_ratio = 0.75
# 内存计算策略
# total_memory: 使用系统总内存
# legacy: 使用 Erlang VM 报告的内存
memory_calculation_strategy = total_memory内存分配器配置
bash
# /etc/rabbitmq/rabbitmq-env.conf
# Binary 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+MBas ageffcbf +MMBcs 512"
# Heap 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS +MHas ageffcbf"
# ETS 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS +MSs ageffcbf +MMscs 512"PHP 代码示例
内存分析工具类
php
<?php
namespace App\RabbitMQ\Memory;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class MemoryAnalyzer
{
private AMQPStreamConnection $connection;
private string $apiHost;
private int $apiPort;
private string $apiUser;
private string $apiPass;
public function __construct(
AMQPStreamConnection $connection,
string $apiHost = 'localhost',
int $apiPort = 15672,
string $apiUser = 'guest',
string $apiPass = 'guest'
) {
$this->connection = $connection;
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->apiUser = $apiUser;
$this->apiPass = $apiPass;
}
public function getMemoryBreakdown(): array
{
$nodeInfo = $this->apiRequest('/api/nodes');
if (empty($nodeInfo)) {
return ['error' => 'Unable to fetch node information'];
}
$node = $nodeInfo[0];
return [
'total' => $node['mem_used'] ?? 0,
'total_human' => $this->formatBytes($node['mem_used'] ?? 0),
'limit' => $node['mem_limit'] ?? 0,
'limit_human' => $this->formatBytes($node['mem_limit'] ?? 0),
'usage_percent' => $this->calculateUsagePercent(
$node['mem_used'] ?? 0,
$node['mem_limit'] ?? 1
),
'breakdown' => $this->parseMemoryBreakdown($node['memory'] ?? []),
'alarm' => $node['mem_alarm'] ?? false,
];
}
public function getQueueMemoryUsage(): array
{
$queues = $this->apiRequest('/api/queues?columns=name,memory,messages');
$result = [];
$totalMemory = 0;
foreach ($queues as $queue) {
$result[$queue['name']] = [
'memory' => $queue['memory'] ?? 0,
'memory_human' => $this->formatBytes($queue['memory'] ?? 0),
'messages' => $queue['messages'] ?? 0,
'bytes_per_message' => $this->calculateBytesPerMessage(
$queue['memory'] ?? 0,
$queue['messages'] ?? 0
),
];
$totalMemory += $queue['memory'] ?? 0;
}
uasort($result, function ($a, $b) {
return $b['memory'] <=> $a['memory'];
});
return [
'queues' => $result,
'total_memory' => $totalMemory,
'total_memory_human' => $this->formatBytes($totalMemory),
'queue_count' => count($result),
];
}
public function getConnectionMemoryUsage(): array
{
$connections = $this->apiRequest('/api/connections?columns=client_properties,memory,channels');
$result = [];
$totalMemory = 0;
foreach ($connections as $conn) {
$clientName = $conn['client_properties']['connection_name'] ?? 'unknown';
$result[$clientName] = [
'memory' => $conn['memory'] ?? 0,
'memory_human' => $this->formatBytes($conn['memory'] ?? 0),
'channels' => $conn['channels'] ?? 0,
];
$totalMemory += $conn['memory'] ?? 0;
}
return [
'connections' => $result,
'total_memory' => $totalMemory,
'total_memory_human' => $this->formatBytes($totalMemory),
'connection_count' => count($result),
];
}
public function getErlangMemoryInfo(): array
{
$memoryTypes = [
'total' => 'erlang:memory(total).',
'processes' => 'erlang:memory(processes).',
'processes_used' => 'erlang:memory(processes_used).',
'binary' => 'erlang:memory(binary).',
'ets' => 'erlang:memory(ets).',
'atom' => 'erlang:memory(atom).',
'atom_used' => 'erlang:memory(atom_used).',
'code' => 'erlang:memory(code).',
];
$result = [];
foreach ($memoryTypes as $type => $command) {
$output = shell_exec("rabbitmqctl eval '{$command}' 2>/dev/null");
$result[$type] = $this->parseErlangValue($output);
}
return [
'memory' => $result,
'percentages' => $this->calculateMemoryPercentages($result),
];
}
public function analyzeMemoryTrend(int $samples = 10, int $intervalMs = 1000): array
{
$trend = [];
for ($i = 0; $i < $samples; $i++) {
$breakdown = $this->getMemoryBreakdown();
$trend[] = [
'timestamp' => microtime(true),
'total' => $breakdown['total'],
'binary' => $breakdown['breakdown']['binary'] ?? 0,
'processes' => $breakdown['breakdown']['processes'] ?? 0,
'ets' => $breakdown['breakdown']['ets'] ?? 0,
];
if ($i < $samples - 1) {
usleep($intervalMs * 1000);
}
}
return [
'samples' => $trend,
'analysis' => $this->analyzeTrendData($trend),
];
}
public function getMemoryRecommendations(): array
{
$breakdown = $this->getMemoryBreakdown();
$queueMemory = $this->getQueueMemoryUsage();
$recommendations = [];
if ($breakdown['usage_percent'] > 80) {
$recommendations[] = [
'priority' => 'critical',
'category' => 'memory',
'issue' => '内存使用率过高',
'current' => $breakdown['usage_percent'] . '%',
'recommendation' => '增加内存或减少消息积压',
];
}
$binaryPercent = ($breakdown['breakdown']['binary'] ?? 0) / ($breakdown['total'] ?: 1) * 100;
if ($binaryPercent > 50) {
$recommendations[] = [
'priority' => 'high',
'category' => 'binary',
'issue' => '二进制内存占比过高',
'current' => round($binaryPercent, 2) . '%',
'recommendation' => '考虑使用懒队列或减少消息大小',
];
}
$etsPercent = ($breakdown['breakdown']['ets'] ?? 0) / ($breakdown['total'] ?: 1) * 100;
if ($etsPercent > 30) {
$recommendations[] = [
'priority' => 'medium',
'category' => 'ets',
'issue' => 'ETS 内存占比过高',
'current' => round($etsPercent, 2) . '%',
'recommendation' => '减少队列数量或优化队列索引',
];
}
return $recommendations;
}
private function apiRequest(string $endpoint): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
return [];
}
return json_decode($response, true) ?: [];
}
private function parseMemoryBreakdown(array $memory): array
{
$breakdown = [];
$types = [
'binary' => 'binary',
'code' => 'code',
'ets' => 'ets',
'msg_index' => 'msg_index',
'other_proc' => 'other_proc',
'other_system' => 'other_system',
'plugins' => 'plugins',
'queue_index' => 'queue_index',
'queue_slave_procs' => 'queue_slave_procs',
'total' => 'total',
];
foreach ($types as $key => $type) {
if (isset($memory[$type])) {
$breakdown[$key] = $memory[$type];
}
}
return $breakdown;
}
private function parseErlangValue(?string $output): int
{
if ($output && preg_match('/(\d+)/', $output, $matches)) {
return (int) $matches[1];
}
return 0;
}
private function calculateMemoryPercentages(array $memory): array
{
$total = $memory['total'] ?: 1;
$percentages = [];
foreach ($memory as $type => $value) {
if ($type !== 'total') {
$percentages[$type] = round($value / $total * 100, 2);
}
}
return $percentages;
}
private function calculateUsagePercent(int $used, int $limit): float
{
if ($limit === 0) {
return 0;
}
return round($used / $limit * 100, 2);
}
private function calculateBytesPerMessage(int $memory, int $messages): float
{
if ($messages === 0) {
return 0;
}
return round($memory / $messages, 2);
}
private function analyzeTrendData(array $trend): array
{
if (count($trend) < 2) {
return [];
}
$totals = array_column($trend, 'total');
$first = $totals[0];
$last = $totals[count($totals) - 1];
$growth = $last - $first;
$growthRate = $growth / count($trend);
return [
'start_memory' => $first,
'end_memory' => $last,
'growth' => $growth,
'growth_human' => $this->formatBytes(abs($growth)),
'growth_direction' => $growth > 0 ? 'increasing' : ($growth < 0 ? 'decreasing' : 'stable'),
'growth_rate_per_sample' => $this->formatBytes(abs($growthRate)),
];
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}内存监控服务
php
<?php
namespace App\RabbitMQ\Memory;
class MemoryMonitorService
{
private MemoryAnalyzer $analyzer;
private array $thresholds;
public function __construct(MemoryAnalyzer $analyzer, array $thresholds = [])
{
$this->analyzer = $analyzer;
$this->thresholds = array_merge([
'warning' => 70,
'critical' => 85,
'emergency' => 95,
], $thresholds);
}
public function check(): array
{
$breakdown = $this->analyzer->getMemoryBreakdown();
$usage = $breakdown['usage_percent'];
$status = 'normal';
$alerts = [];
if ($usage >= $this->thresholds['emergency']) {
$status = 'emergency';
$alerts[] = $this->createAlert('emergency', $usage);
} elseif ($usage >= $this->thresholds['critical']) {
$status = 'critical';
$alerts[] = $this->createAlert('critical', $usage);
} elseif ($usage >= $this->thresholds['warning']) {
$status = 'warning';
$alerts[] = $this->createAlert('warning', $usage);
}
return [
'status' => $status,
'usage_percent' => $usage,
'breakdown' => $breakdown,
'alerts' => $alerts,
'timestamp' => date('Y-m-d H:i:s'),
];
}
public function getDetailedReport(): array
{
return [
'memory' => $this->analyzer->getMemoryBreakdown(),
'queues' => $this->analyzer->getQueueMemoryUsage(),
'connections' => $this->analyzer->getConnectionMemoryUsage(),
'erlang' => $this->analyzer->getErlangMemoryInfo(),
'recommendations' => $this->analyzer->getMemoryRecommendations(),
];
}
public function startContinuousMonitoring(int $intervalSeconds = 60): void
{
while (true) {
$check = $this->check();
if ($check['status'] !== 'normal') {
$this->handleAlert($check);
}
$this->logMemoryStatus($check);
sleep($intervalSeconds);
}
}
private function createAlert(string $level, float $usage): array
{
return [
'level' => $level,
'message' => "内存使用率达到 {$usage}%",
'timestamp' => date('Y-m-d H:i:s'),
'threshold' => $this->thresholds[$level],
];
}
private function handleAlert(array $check): void
{
// 发送告警通知
error_log("RabbitMQ Memory Alert: " . json_encode($check));
}
private function logMemoryStatus(array $check): void
{
// 记录内存状态
$logEntry = sprintf(
"[%s] Memory: %.2f%% - Status: %s\n",
$check['timestamp'],
$check['usage_percent'],
$check['status']
);
file_put_contents('/var/log/rabbitmq_memory.log', $logEntry, FILE_APPEND);
}
}内存使用报告生成器
php
<?php
namespace App\RabbitMQ\Memory;
class MemoryReportGenerator
{
private MemoryAnalyzer $analyzer;
public function __construct(MemoryAnalyzer $analyzer)
{
$this->analyzer = $analyzer;
}
public function generateReport(string $format = 'text'): string
{
$data = $this->gatherData();
switch ($format) {
case 'json':
return json_encode($data, JSON_PRETTY_PRINT);
case 'html':
return $this->generateHtmlReport($data);
case 'text':
default:
return $this->generateTextReport($data);
}
}
private function gatherData(): array
{
return [
'generated_at' => date('Y-m-d H:i:s'),
'memory_breakdown' => $this->analyzer->getMemoryBreakdown(),
'queue_memory' => $this->analyzer->getQueueMemoryUsage(),
'connection_memory' => $this->analyzer->getConnectionMemoryUsage(),
'erlang_memory' => $this->analyzer->getErlangMemoryInfo(),
'recommendations' => $this->analyzer->getMemoryRecommendations(),
];
}
private function generateTextReport(array $data): string
{
$report = [];
$report[] = "=" * 60;
$report[] = "RabbitMQ Memory Analysis Report";
$report[] = "Generated: " . $data['generated_at'];
$report[] = "=" * 60;
$report[] = "";
$breakdown = $data['memory_breakdown'];
$report[] = "## Memory Overview";
$report[] = "Total Used: " . $breakdown['total_human'];
$report[] = "Memory Limit: " . $breakdown['limit_human'];
$report[] = "Usage: " . $breakdown['usage_percent'] . "%";
$report[] = "Alarm Status: " . ($breakdown['alarm'] ? 'ACTIVE' : 'Normal');
$report[] = "";
$report[] = "## Memory Breakdown";
foreach ($breakdown['breakdown'] as $type => $value) {
$percent = round($value / ($breakdown['total'] ?: 1) * 100, 2);
$report[] = sprintf(" %-20s: %s (%.2f%%)", $type, $this->formatBytes($value), $percent);
}
$report[] = "";
$queueMemory = $data['queue_memory'];
$report[] = "## Top 10 Queues by Memory";
$topQueues = array_slice($queueMemory['queues'], 0, 10, true);
foreach ($topQueues as $name => $info) {
$report[] = sprintf(" %-40s: %s (%d messages)",
$name, $info['memory_human'], $info['messages']);
}
$report[] = "";
if (!empty($data['recommendations'])) {
$report[] = "## Recommendations";
foreach ($data['recommendations'] as $rec) {
$report[] = " [{$rec['priority']}] {$rec['issue']}";
$report[] = " Current: {$rec['current']}";
$report[] = " Action: {$rec['recommendation']}";
}
}
return implode("\n", $report);
}
private function generateHtmlReport(array $data): string
{
$html = '<!DOCTYPE html><html><head><title>RabbitMQ Memory Report</title>';
$html .= '<style>body{font-family:Arial,sans-serif;margin:20px;}';
$html .= 'table{border-collapse:collapse;width:100%;margin:10px 0;}';
$html .= 'th,td{border:1px solid #ddd;padding:8px;text-align:left;}';
$html .= 'th{background-color:#4CAF50;color:white;}';
$html .= '.warning{background-color:#fff3cd;}';
$html .= '.critical{background-color:#f8d7da;}</style></head><body>';
$html .= '<h1>RabbitMQ Memory Analysis Report</h1>';
$html .= '<p>Generated: ' . $data['generated_at'] . '</p>';
$breakdown = $data['memory_breakdown'];
$html .= '<h2>Memory Overview</h2>';
$html .= '<table><tr><th>Metric</th><th>Value</th></tr>';
$html .= '<tr><td>Total Used</td><td>' . $breakdown['total_human'] . '</td></tr>';
$html .= '<tr><td>Memory Limit</td><td>' . $breakdown['limit_human'] . '</td></tr>';
$html .= '<tr><td>Usage</td><td>' . $breakdown['usage_percent'] . '%</td></tr>';
$html .= '</table>';
$html .= '<h2>Memory Breakdown</h2>';
$html .= '<table><tr><th>Type</th><th>Size</th><th>Percentage</th></tr>';
foreach ($breakdown['breakdown'] as $type => $value) {
$percent = round($value / ($breakdown['total'] ?: 1) * 100, 2);
$html .= '<tr><td>' . $type . '</td><td>' . $this->formatBytes($value) . '</td><td>' . $percent . '%</td></tr>';
}
$html .= '</table>';
$html .= '</body></html>';
return $html;
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}实际应用场景
场景一:内存泄漏排查
php
<?php
class MemoryLeakDetector
{
private MemoryAnalyzer $analyzer;
private array $baseline = [];
public function setBaseline(): void
{
$this->baseline = $this->analyzer->getMemoryBreakdown();
}
public function detectLeak(): array
{
$current = $this->analyzer->getMemoryBreakdown();
$growth = $current['total'] - $this->baseline['total'];
return [
'baseline' => $this->baseline['total_human'],
'current' => $current['total_human'],
'growth' => $this->analyzer->formatBytes(abs($growth)),
'potential_leak' => $growth > 100 * 1024 * 1024,
];
}
}场景二:容量规划
php
<?php
class MemoryCapacityPlanner
{
public function estimateRequirements(array $params): array
{
$connections = $params['connections'] ?? 100;
$channels = $params['channels'] ?? 500;
$queues = $params['queues'] ?? 100;
$messagesPerQueue = $params['messages_per_queue'] ?? 10000;
$avgMessageSize = $params['avg_message_size'] ?? 1024;
$connectionMemory = $connections * 100 * 1024;
$channelMemory = $channels * 50 * 1024;
$queueBaseMemory = $queues * 30 * 1024;
$messageIndexMemory = $queues * $messagesPerQueue * 150;
$messageBodyMemory = $queues * $messagesPerQueue * $avgMessageSize * 0.1;
$totalEstimate = $connectionMemory + $channelMemory + $queueBaseMemory +
$messageIndexMemory + $messageBodyMemory;
return [
'estimated_memory' => $totalEstimate,
'estimated_memory_human' => $this->formatBytes($totalEstimate),
'recommended_memory' => $totalEstimate * 2,
'recommended_memory_human' => $this->formatBytes($totalEstimate * 2),
'breakdown' => [
'connections' => $this->formatBytes($connectionMemory),
'channels' => $this->formatBytes($channelMemory),
'queue_base' => $this->formatBytes($queueBaseMemory),
'message_index' => $this->formatBytes($messageIndexMemory),
'message_body' => $this->formatBytes($messageBodyMemory),
],
];
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}常见问题与解决方案
问题一:内存持续增长
诊断步骤:
bash
# 查看内存详情
rabbitmqctl status | grep -A 50 memory
# 查看队列内存
rabbitmqctl list_queues name memory messages解决方案:
- 检查是否有消费者断开
- 检查队列是否设置了 TTL
- 考虑使用懒队列
问题二:Binary 内存过高
解决方案:
bash
# 强制 GC
rabbitmqctl eval 'erlang:garbage_collect().'
# 查看大 binary
rabbitmqctl eval 'erlang:memory(binary).'最佳实践建议
内存监控
| 指标 | 告警阈值 | 处理建议 |
|---|---|---|
| 总内存使用率 | > 80% | 扩容或清理 |
| Binary 占比 | > 50% | 检查消息积压 |
| ETS 占比 | > 30% | 减少队列数 |
内存优化
| 场景 | 优化方法 |
|---|---|
| 大量消息 | 使用懒队列 |
| 大消息 | 压缩或分片 |
| 大量队列 | 合并或清理 |
